<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>config-netty-config</artifactId>
+ <!--
+ note, the reason the type and classifier
+ are here instead of in opendaylight/commons/opendaylight/pom.xml
+ is because they are used as jars in distribution.
+ -->
+ <version>${config.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
</dependency>
<!-- test to validate features.xml -->
<dependency>
</properties>
<dependencies>
+ <!-- dependency for opendaylight-karaf-empty for use by testing -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>opendaylight-karaf-empty</artifactId>
+ <version>1.4.2-SNAPSHOT</version>
+ <type>zip</type>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>features-yangtools</artifactId>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>md-sal-config</artifactId>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>netconf-connector-config</artifactId>
+ <version>${netconf.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-rest-connector-config</artifactId>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
</dependency>
<dependency>
<groupId>org.opendaylight.controller.samples</groupId>
<dependency>
<groupId>org.opendaylight.controller.samples</groupId>
<artifactId>toaster-config</artifactId>
+ <version>${mdsal.version}</version>
+ <type>xml</type>
+ <classifier>config</classifier>
</dependency>
<!-- test to validate features.xml -->
<dependency>
actorSystem.eventStream().subscribe(mockReceiver.getRef(), DeadLetter.class);
- final FiniteDuration ONE_SEC = new FiniteDuration(1, TimeUnit.SECONDS);
+ final FiniteDuration TEN_SEC = new FiniteDuration(10, TimeUnit.SECONDS);
String boundedMailBox = actorSystem.name() + ".bounded-mailbox";
ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(boundedMailBox),
"pingpongactor");
pingPongActor.tell("ping", mockReceiver.getRef());
}
- mockReceiver.expectMsgClass(ONE_SEC, DeadLetter.class);
+ mockReceiver.expectMsgClass(TEN_SEC, DeadLetter.class);
lock.unlock();
- Object[] eleven = mockReceiver.receiveN(11, ONE_SEC);
+ Object[] eleven = mockReceiver.receiveN(11, TEN_SEC);
}
/**
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import com.google.common.base.Preconditions;
+
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.util.concurrent.NotificationManager;
import org.slf4j.LoggerFactory;
class ChangeListenerNotifyTask implements Runnable {
-
private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
- private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
- private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
-
@SuppressWarnings("rawtypes")
- private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
- notificationMgr;
+ private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr;
+ private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
+ private final DataChangeListenerRegistration<?> listener;
@SuppressWarnings("rawtypes")
- public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
+ public ChangeListenerNotifyTask(final DataChangeListenerRegistration<?> listener,
final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event,
final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
- this.listeners = listeners;
- this.event = event;
- this.notificationMgr = notificationMgr;
+ this.notificationMgr = Preconditions.checkNotNull(notificationMgr);
+ this.listener = Preconditions.checkNotNull(listener);
+ this.event = Preconditions.checkNotNull(event);
}
@Override
public void run() {
-
- for (DataChangeListenerRegistration<?> listener : listeners) {
- notificationMgr.submitNotification(listener.getInstance(), event);
+ final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> l = listener.getInstance();
+ if (l == null) {
+ LOG.trace("Skipping event delivery to unregistered listener {}", l);
+ return;
}
+ LOG.trace("Listener {} event {}", l, event);
+
+ // FIXME: Yo dawg I heard you like queues, so this was queued to be queued
+ notificationMgr.submitNotification(l, event);
}
@Override
public String toString() {
- return "ChangeListenerNotifyTask [listeners=" + listeners + ", event=" + event + "]";
+ return "ChangeListenerNotifyTask [listener=" + listener + ", event=" + event + "]";
}
}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import com.google.common.base.Preconditions;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.base.Preconditions;
-
public final class DOMImmutableDataChangeEvent implements
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
updated.put(path, after);
return this;
}
+
+ public boolean isEmpty() {
+ return created.isEmpty() && removed.isEmpty() && updated.isEmpty();
+ }
}
private static final class RemoveEventFactory implements SimpleEventFactory {
import javax.annotation.concurrent.GuardedBy;
-import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
.addCreated(path, data) //
.build();
- new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
+ new ChangeListenerNotifyTask(reg, event,
dataChangeListenerNotificationManager).run();
}
}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import static org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.builder;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.Callable;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
import org.opendaylight.yangtools.util.concurrent.NotificationManager;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
*/
final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListenerNotifyTask>> {
private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeEventsTask.class);
- private static final DOMImmutableDataChangeEvent NO_CHANGE = builder(DataChangeScope.BASE).build();
- private final Multimap<ListenerTree.Node, DOMImmutableDataChangeEvent> events = HashMultimap.create();
+ @SuppressWarnings("rawtypes")
+ private final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr;
private final DataTreeCandidate candidate;
private final ListenerTree listenerRoot;
- @SuppressWarnings("rawtypes")
- private final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr;
+ private Multimap<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> collectedEvents;
@SuppressWarnings("rawtypes")
public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree,
* order to delivery data change events.
*/
@Override
- public Iterable<ChangeListenerNotifyTask> call() {
+ public synchronized Iterable<ChangeListenerNotifyTask> call() {
try (final Walker w = listenerRoot.getWalker()) {
- resolveAnyChangeEvent(candidate.getRootPath(), Collections.singleton(w.getRootNode()), candidate.getRootNode());
- return createNotificationTasks();
- }
- }
-
- /**
- *
- * Walks map of listeners to data change events, creates notification
- * delivery tasks.
- *
- * Walks map of registered and affected listeners and creates notification
- * tasks from set of listeners and events to be delivered.
- *
- * If set of listeners has more then one event (applicable to wildcarded
- * listeners), merges all data change events into one, final which contains
- * all separate updates.
- *
- * Dispatch between merge variant and reuse variant of notification task is
- * done in
- * {@link #addNotificationTask(com.google.common.collect.ImmutableList.Builder, Node, java.util.Collection)}
- *
- * @return Collection of notification tasks.
- */
- private Collection<ChangeListenerNotifyTask> createNotificationTasks() {
- ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder = ImmutableList.builder();
- for (Entry<ListenerTree.Node, Collection<DOMImmutableDataChangeEvent>> entry : events.asMap().entrySet()) {
- addNotificationTask(taskListBuilder, entry.getKey(), entry.getValue());
- }
- return taskListBuilder.build();
- }
-
- /**
- * Adds notification task to task list.
- *
- * If entry collection contains one event, this event is reused and added to
- * notification tasks for listeners (see
- * {@link #addNotificationTaskByScope(com.google.common.collect.ImmutableList.Builder, Node, DOMImmutableDataChangeEvent)}
- * . Otherwise events are merged by scope and distributed between listeners
- * to particular scope. See
- * {@link #addNotificationTasksAndMergeEvents(com.google.common.collect.ImmutableList.Builder, Node, java.util.Collection)}
- * .
- *
- * @param taskListBuilder
- * @param listeners
- * @param entries
- */
- private void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
- final ListenerTree.Node listeners, final Collection<DOMImmutableDataChangeEvent> entries) {
-
- if (!entries.isEmpty()) {
- if (entries.size() == 1) {
- addNotificationTaskByScope(taskListBuilder, listeners, Iterables.getOnlyElement(entries));
- } else {
- addNotificationTasksAndMergeEvents(taskListBuilder, listeners, entries);
- }
- }
- }
+ // Defensive: reset internal state
+ collectedEvents = ArrayListMultimap.create();
- /**
- *
- * Add notification deliveries task to the listener.
- *
- *
- * @param taskListBuilder
- * @param listeners
- * @param event
- */
- private void addNotificationTaskByScope(
- final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
- final DOMImmutableDataChangeEvent event) {
- DataChangeScope eventScope = event.getScope();
- for (DataChangeListenerRegistration<?> listenerReg : listeners.getListeners()) {
- DataChangeScope listenerScope = listenerReg.getScope();
- List<DataChangeListenerRegistration<?>> listenerSet = Collections
- .<DataChangeListenerRegistration<?>> singletonList(listenerReg);
- if (eventScope == DataChangeScope.BASE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
- } else if (eventScope == DataChangeScope.ONE && listenerScope != DataChangeScope.BASE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
- } else if (eventScope == DataChangeScope.SUBTREE && listenerScope == DataChangeScope.SUBTREE) {
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
- }
- }
- }
+ // Run through the tree
+ final ResolveDataChangeState s = ResolveDataChangeState.initial(candidate.getRootPath(), w.getRootNode());
+ resolveAnyChangeEvent(s, candidate.getRootNode());
- /**
- *
- * Add notification tasks with merged event
- *
- * Separate Events by scope and creates merged notification tasks for each
- * and every scope which is present.
- *
- * Adds merged events to task list based on scope requested by client.
- *
- * @param taskListBuilder
- * @param listeners
- * @param entries
- */
- private void addNotificationTasksAndMergeEvents(
- final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
- final Collection<DOMImmutableDataChangeEvent> entries) {
-
- final Builder baseBuilder = builder(DataChangeScope.BASE);
- final Builder oneBuilder = builder(DataChangeScope.ONE);
- final Builder subtreeBuilder = builder(DataChangeScope.SUBTREE);
-
- boolean baseModified = false;
- boolean oneModified = false;
- boolean subtreeModified = false;
- for (final DOMImmutableDataChangeEvent entry : entries) {
- switch (entry.getScope()) {
- // Absence of breaks is intentional here. Subtree contains base and
- // one, one also contains base
- case BASE:
- baseBuilder.merge(entry);
- baseModified = true;
- case ONE:
- oneBuilder.merge(entry);
- oneModified = true;
- case SUBTREE:
- subtreeBuilder.merge(entry);
- subtreeModified = true;
+ /*
+ * Convert to tasks, but be mindful of multiple values -- those indicate multiple
+ * wildcard matches, which need to be merged.
+ */
+ final Collection<ChangeListenerNotifyTask> ret = new ArrayList<>();
+ for (Entry<DataChangeListenerRegistration<?>, Collection<DOMImmutableDataChangeEvent>> e : collectedEvents.asMap().entrySet()) {
+ final Collection<DOMImmutableDataChangeEvent> col = e.getValue();
+ final DOMImmutableDataChangeEvent event;
+
+ if (col.size() != 1) {
+ final Builder b = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE);
+ for (DOMImmutableDataChangeEvent i : col) {
+ b.merge(i);
+ }
+
+ event = b.build();
+ LOG.trace("Merged events {} into event {}", col, event);
+ } else {
+ event = col.iterator().next();
+ }
+
+ ret.add(new ChangeListenerNotifyTask(e.getKey(), event, notificationMgr));
}
- }
- if (baseModified) {
- addNotificationTaskExclusively(taskListBuilder, listeners, baseBuilder.build());
- }
- if (oneModified) {
- addNotificationTaskExclusively(taskListBuilder, listeners, oneBuilder.build());
- }
- if (subtreeModified) {
- addNotificationTaskExclusively(taskListBuilder, listeners, subtreeBuilder.build());
- }
- }
-
- private void addNotificationTaskExclusively(
- final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final Node listeners,
- final DOMImmutableDataChangeEvent event) {
- for (DataChangeListenerRegistration<?> listener : listeners.getListeners()) {
- if (listener.getScope() == event.getScope()) {
- Set<DataChangeListenerRegistration<?>> listenerSet = Collections
- .<DataChangeListenerRegistration<?>> singleton(listener);
- taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
- }
+ // FIXME: so now we have tasks to submit tasks... Inception-style!
+ LOG.debug("Created tasks {}", ret);
+ return ret;
}
}
* - Original (before) state of current node
* @param after
* - After state of current node
- * @return Data Change Event of this node and all it's children
+ * @return True if the subtree changed, false otherwise
*/
- private DOMImmutableDataChangeEvent resolveAnyChangeEvent(final YangInstanceIdentifier path,
- final Collection<ListenerTree.Node> listeners, final DataTreeCandidateNode node) {
-
+ private boolean resolveAnyChangeEvent(final ResolveDataChangeState state, final DataTreeCandidateNode node) {
if (node.getModificationType() != ModificationType.UNMODIFIED &&
!node.getDataAfter().isPresent() && !node.getDataBefore().isPresent()) {
LOG.debug("Modification at {} has type {}, but no before- and after-data. Assuming unchanged.",
- path, node.getModificationType());
- return NO_CHANGE;
+ state.getPath(), node.getModificationType());
+ return false;
}
// no before and after state is present
switch (node.getModificationType()) {
case SUBTREE_MODIFIED:
- return resolveSubtreeChangeEvent(path, listeners, node);
+ return resolveSubtreeChangeEvent(state, node);
case MERGE:
case WRITE:
Preconditions.checkArgument(node.getDataAfter().isPresent(),
- "Modification at {} has type {} but no after-data", path, node.getModificationType());
- if (node.getDataBefore().isPresent()) {
- return resolveReplacedEvent(path, listeners, node.getDataBefore().get(), node.getDataAfter().get());
- } else {
- return resolveCreateEvent(path, listeners, node.getDataAfter().get());
+ "Modification at {} has type {} but no after-data", state.getPath(), node.getModificationType());
+ if (!node.getDataBefore().isPresent()) {
+ resolveCreateEvent(state, node.getDataAfter().get());
+ return true;
}
+
+ return resolveReplacedEvent(state, node.getDataBefore().get(), node.getDataAfter().get());
case DELETE:
Preconditions.checkArgument(node.getDataBefore().isPresent(),
- "Modification at {} has type {} but no before-data", path, node.getModificationType());
- return resolveDeleteEvent(path, listeners, node.getDataBefore().get());
+ "Modification at {} has type {} but no before-data", state.getPath(), node.getModificationType());
+ resolveDeleteEvent(state, node.getDataBefore().get());
+ return true;
case UNMODIFIED:
- return NO_CHANGE;
+ return false;
}
- throw new IllegalStateException(String.format("Unhandled node state %s at %s", node.getModificationType(), path));
+ throw new IllegalStateException(String.format("Unhandled node state %s at %s", node.getModificationType(), state.getPath()));
}
- private DOMImmutableDataChangeEvent resolveReplacedEvent(final YangInstanceIdentifier path,
- final Collection<Node> listeners, final NormalizedNode<?, ?> beforeData,
- final NormalizedNode<?, ?> afterData) {
-
- // FIXME: BUG-1493: check the listeners to prune unneeded changes:
- // for subtrees, we have to do all
- // for one, we need to expand children
- // for base, we just report replacement
+ private boolean resolveReplacedEvent(final ResolveDataChangeState state,
+ final NormalizedNode<?, ?> beforeData, final NormalizedNode<?, ?> afterData) {
if (beforeData instanceof NormalizedNodeContainer<?, ?, ?>) {
- // Node is container (contains child) and we have interested
- // listeners registered for it, that means we need to do
- // resolution of changes on children level and can not
- // shortcut resolution.
- LOG.trace("Resolving subtree replace event for {} before {}, after {}",path,beforeData,afterData);
+ /*
+ * Node is a container (contains a child) and we have interested
+ * listeners registered for it, that means we need to do
+ * resolution of changes on children level and can not
+ * shortcut resolution.
+ */
+ LOG.trace("Resolving subtree replace event for {} before {}, after {}", state.getPath(), beforeData, afterData);
@SuppressWarnings("unchecked")
NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont = (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) beforeData;
@SuppressWarnings("unchecked")
NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont = (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) afterData;
- return resolveNodeContainerReplaced(path, listeners, beforeCont, afterCont);
- } else if (!beforeData.equals(afterData)) {
- // Node is Leaf type (does not contain child nodes)
- // so normal equals method is sufficient for determining change.
- LOG.trace("Resolving leaf replace event for {} , before {}, after {}",path,beforeData,afterData);
- DOMImmutableDataChangeEvent event = builder(DataChangeScope.BASE).setBefore(beforeData).setAfter(afterData)
- .addUpdated(path, beforeData, afterData).build();
- addPartialTask(listeners, event);
- return event;
- } else {
- return NO_CHANGE;
+ return resolveNodeContainerReplaced(state, beforeCont, afterCont);
}
+
+ // Node is a Leaf type (does not contain child nodes)
+ // so normal equals method is sufficient for determining change.
+ if (beforeData.equals(afterData)) {
+ LOG.trace("Skipping equal leaf {}", state.getPath());
+ return false;
+ }
+
+ LOG.trace("Resolving leaf replace event for {} , before {}, after {}", state.getPath(), beforeData, afterData);
+ DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).addUpdated(state.getPath(), beforeData, afterData).build();
+ state.addEvent(event);
+ state.collectEvents(beforeData, afterData, collectedEvents);
+ return true;
}
- private DOMImmutableDataChangeEvent resolveNodeContainerReplaced(final YangInstanceIdentifier path,
- final Collection<Node> listeners,
+ private boolean resolveNodeContainerReplaced(final ResolveDataChangeState state,
final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont,
final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont) {
- final List<DOMImmutableDataChangeEvent> childChanges = new LinkedList<>();
+ if (!state.needsProcessing()) {
+ LOG.trace("Not processing replaced container {}", state.getPath());
+ return true;
+ }
// We look at all children from before and compare it with after state.
+ boolean childChanged = false;
for (NormalizedNode<PathArgument, ?> beforeChild : beforeCont.getValue()) {
final PathArgument childId = beforeChild.getIdentifier();
- YangInstanceIdentifier childPath = path.node(childId);
- Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
- Optional<NormalizedNode<PathArgument, ?>> afterChild = afterCont.getChild(childId);
- DOMImmutableDataChangeEvent childChange = resolveNodeContainerChildUpdated(childPath, childListeners,
- beforeChild, afterChild);
- // If change is empty (equals to NO_CHANGE)
- if (childChange != NO_CHANGE) {
- childChanges.add(childChange);
+ if (resolveNodeContainerChildUpdated(state.child(childId), beforeChild, afterCont.getChild(childId))) {
+ childChanged = true;
}
}
* created.
*/
if (!beforeCont.getChild(childId).isPresent()) {
- Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
- YangInstanceIdentifier childPath = path.node(childId);
- childChanges.add(resolveSameEventRecursivelly(childPath , childListeners, afterChild,
- DOMImmutableDataChangeEvent.getCreateEventFactory()));
+ resolveSameEventRecursivelly(state.child(childId), afterChild, DOMImmutableDataChangeEvent.getCreateEventFactory());
+ childChanged = true;
}
}
- if (childChanges.isEmpty()) {
- return NO_CHANGE;
- }
- Builder eventBuilder = builder(DataChangeScope.BASE) //
- .setBefore(beforeCont) //
- .setAfter(afterCont)
- .addUpdated(path, beforeCont, afterCont);
- for (DOMImmutableDataChangeEvent childChange : childChanges) {
- eventBuilder.merge(childChange);
+ if (childChanged) {
+ DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE)
+ .addUpdated(state.getPath(), beforeCont, afterCont).build();
+ state.addEvent(event);
}
- DOMImmutableDataChangeEvent replaceEvent = eventBuilder.build();
- addPartialTask(listeners, replaceEvent);
- return replaceEvent;
+ state.collectEvents(beforeCont, afterCont, collectedEvents);
+ return childChanged;
}
- private DOMImmutableDataChangeEvent resolveNodeContainerChildUpdated(final YangInstanceIdentifier path,
- final Collection<Node> listeners, final NormalizedNode<PathArgument, ?> before,
- final Optional<NormalizedNode<PathArgument, ?>> after) {
-
+ private boolean resolveNodeContainerChildUpdated(final ResolveDataChangeState state,
+ final NormalizedNode<PathArgument, ?> before, final Optional<NormalizedNode<PathArgument, ?>> after) {
if (after.isPresent()) {
// REPLACE or SUBTREE Modified
- return resolveReplacedEvent(path, listeners, before, after.get());
-
- } else {
- // AFTER state is not present - child was deleted.
- return resolveSameEventRecursivelly(path, listeners, before,
- DOMImmutableDataChangeEvent.getRemoveEventFactory());
+ return resolveReplacedEvent(state, before, after.get());
}
+
+ // AFTER state is not present - child was deleted.
+ resolveSameEventRecursivelly(state, before, DOMImmutableDataChangeEvent.getRemoveEventFactory());
+ return true;
}
/**
* Resolves create events deep down the interest listener tree.
*
- *
* @param path
* @param listeners
* @param afterState
* @return
*/
- private DOMImmutableDataChangeEvent resolveCreateEvent(final YangInstanceIdentifier path,
- final Collection<ListenerTree.Node> listeners, final NormalizedNode<?, ?> afterState) {
+ private void resolveCreateEvent(final ResolveDataChangeState state, final NormalizedNode<?, ?> afterState) {
@SuppressWarnings({ "unchecked", "rawtypes" })
final NormalizedNode<PathArgument, ?> node = (NormalizedNode) afterState;
- return resolveSameEventRecursivelly(path, listeners, node, DOMImmutableDataChangeEvent.getCreateEventFactory());
+ resolveSameEventRecursivelly(state, node, DOMImmutableDataChangeEvent.getCreateEventFactory());
}
- private DOMImmutableDataChangeEvent resolveDeleteEvent(final YangInstanceIdentifier path,
- final Collection<ListenerTree.Node> listeners, final NormalizedNode<?, ?> beforeState) {
-
+ private void resolveDeleteEvent(final ResolveDataChangeState state, final NormalizedNode<?, ?> beforeState) {
@SuppressWarnings({ "unchecked", "rawtypes" })
final NormalizedNode<PathArgument, ?> node = (NormalizedNode) beforeState;
- return resolveSameEventRecursivelly(path, listeners, node, DOMImmutableDataChangeEvent.getRemoveEventFactory());
+ resolveSameEventRecursivelly(state, node, DOMImmutableDataChangeEvent.getRemoveEventFactory());
}
- private DOMImmutableDataChangeEvent resolveSameEventRecursivelly(final YangInstanceIdentifier path,
- final Collection<Node> listeners, final NormalizedNode<PathArgument, ?> node,
- final SimpleEventFactory eventFactory) {
- final DOMImmutableDataChangeEvent event = eventFactory.create(path, node);
- DOMImmutableDataChangeEvent propagateEvent = event;
+ private void resolveSameEventRecursivelly(final ResolveDataChangeState state,
+ final NormalizedNode<PathArgument, ?> node, final SimpleEventFactory eventFactory) {
+ if (!state.needsProcessing()) {
+ LOG.trace("Skipping child {}", state.getPath());
+ return;
+ }
+
// We have listeners for this node or it's children, so we will try
// to do additional processing
if (node instanceof NormalizedNodeContainer<?, ?, ?>) {
- LOG.trace("Resolving subtree recursive event for {}, type {}", path, eventFactory);
-
- Builder eventBuilder = builder(DataChangeScope.BASE);
- eventBuilder.merge(event);
- eventBuilder.setBefore(event.getOriginalSubtree());
- eventBuilder.setAfter(event.getUpdatedSubtree());
+ LOG.trace("Resolving subtree recursive event for {}, type {}", state.getPath(), eventFactory);
// Node has children, so we will try to resolve it's children
// changes.
@SuppressWarnings("unchecked")
NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> container = (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) node;
for (NormalizedNode<PathArgument, ?> child : container.getValue()) {
- PathArgument childId = child.getIdentifier();
+ final PathArgument childId = child.getIdentifier();
+
LOG.trace("Resolving event for child {}", childId);
- Collection<Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
- eventBuilder.merge(resolveSameEventRecursivelly(path.node(childId), childListeners, child, eventFactory));
+ resolveSameEventRecursivelly(state.child(childId), child, eventFactory);
}
- propagateEvent = eventBuilder.build();
}
- if (!listeners.isEmpty()) {
- addPartialTask(listeners, propagateEvent);
- }
- return propagateEvent;
- }
- private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final YangInstanceIdentifier path,
- final Collection<ListenerTree.Node> listeners, final DataTreeCandidateNode modification) {
+ final DOMImmutableDataChangeEvent event = eventFactory.create(state.getPath(), node);
+ LOG.trace("Adding event {} at path {}", event, state.getPath());
+ state.addEvent(event);
+ state.collectEvents(event.getOriginalSubtree(), event.getUpdatedSubtree(), collectedEvents);
+ }
- Preconditions.checkArgument(modification.getDataBefore().isPresent(), "Subtree change with before-data not present at path %s", path);
- Preconditions.checkArgument(modification.getDataAfter().isPresent(), "Subtree change with after-data not present at path %s", path);
+ private boolean resolveSubtreeChangeEvent(final ResolveDataChangeState state, final DataTreeCandidateNode modification) {
+ Preconditions.checkArgument(modification.getDataBefore().isPresent(), "Subtree change with before-data not present at path %s", state.getPath());
+ Preconditions.checkArgument(modification.getDataAfter().isPresent(), "Subtree change with after-data not present at path %s", state.getPath());
- Builder one = builder(DataChangeScope.ONE).
- setBefore(modification.getDataBefore().get()).
- setAfter(modification.getDataAfter().get());
- Builder subtree = builder(DataChangeScope.SUBTREE).
- setBefore(modification.getDataBefore().get()).
- setAfter(modification.getDataAfter().get());
- boolean oneModified = false;
+ DataChangeScope scope = null;
for (DataTreeCandidateNode childMod : modification.getChildNodes()) {
- PathArgument childId = childMod.getIdentifier();
- YangInstanceIdentifier childPath = path.node(childId);
- Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
-
+ final ResolveDataChangeState childState = state.child(childMod.getIdentifier());
switch (childMod.getModificationType()) {
case WRITE:
case MERGE:
case DELETE:
- one.merge(resolveAnyChangeEvent(childPath, childListeners, childMod));
- oneModified = true;
+ if (resolveAnyChangeEvent(childState, childMod)) {
+ scope = DataChangeScope.ONE;
+ }
break;
case SUBTREE_MODIFIED:
- subtree.merge(resolveSubtreeChangeEvent(childPath, childListeners, childMod));
+ if (resolveSubtreeChangeEvent(childState, childMod) && scope == null) {
+ scope = DataChangeScope.SUBTREE;
+ }
break;
case UNMODIFIED:
// no-op
break;
}
}
- final DOMImmutableDataChangeEvent oneChangeEvent;
- if(oneModified) {
- one.addUpdated(path, modification.getDataBefore().get(), modification.getDataAfter().get());
- oneChangeEvent = one.build();
- subtree.merge(oneChangeEvent);
- } else {
- oneChangeEvent = null;
- subtree.addUpdated(path, modification.getDataBefore().get(), modification.getDataAfter().get());
- }
- DOMImmutableDataChangeEvent subtreeEvent = subtree.build();
- if (!listeners.isEmpty()) {
- if(oneChangeEvent != null) {
- addPartialTask(listeners, oneChangeEvent);
- }
- addPartialTask(listeners, subtreeEvent);
- }
- return subtreeEvent;
- }
- private DOMImmutableDataChangeEvent addPartialTask(final Collection<ListenerTree.Node> listeners,
- final DOMImmutableDataChangeEvent event) {
- for (ListenerTree.Node listenerNode : listeners) {
- if (!listenerNode.getListeners().isEmpty()) {
- LOG.trace("Adding event {} for listeners {}",event,listenerNode);
- events.put(listenerNode, event);
- }
- }
- return event;
- }
+ final NormalizedNode<?, ?> before = modification.getDataBefore().get();
+ final NormalizedNode<?, ?> after = modification.getDataAfter().get();
- private static Collection<ListenerTree.Node> getListenerChildrenWildcarded(final Collection<ListenerTree.Node> parentNodes,
- final PathArgument child) {
- if (parentNodes.isEmpty()) {
- return Collections.emptyList();
- }
- com.google.common.collect.ImmutableList.Builder<ListenerTree.Node> result = ImmutableList.builder();
- if (child instanceof NodeWithValue || child instanceof NodeIdentifierWithPredicates) {
- NodeIdentifier wildcardedIdentifier = new NodeIdentifier(child.getNodeType());
- addChildrenNodesToBuilder(result, parentNodes, wildcardedIdentifier);
+ if (scope != null) {
+ DOMImmutableDataChangeEvent one = DOMImmutableDataChangeEvent.builder(scope).addUpdated(state.getPath(), before, after).build();
+ state.addEvent(one);
}
- addChildrenNodesToBuilder(result, parentNodes, child);
- return result.build();
- }
- private static void addChildrenNodesToBuilder(final ImmutableList.Builder<ListenerTree.Node> result,
- final Collection<ListenerTree.Node> parentNodes, final PathArgument childIdentifier) {
- for (ListenerTree.Node node : parentNodes) {
- Optional<ListenerTree.Node> child = node.getChild(childIdentifier);
- if (child.isPresent()) {
- result.add(child.get());
- }
- }
+ state.collectEvents(before, after, collectedEvents);
+ return scope != null;
}
@SuppressWarnings("rawtypes")
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recursion state used in {@link ResolveDataChangeEventsTask}. Instances of this
+ * method track which listeners are affected by a particular change node. It takes
+ * care of properly inheriting SUB/ONE listeners and also provides a means to
+ * understand when actual processing need not occur.
+ */
+final class ResolveDataChangeState {
+ private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeState.class);
+ /**
+ * Inherited from all parents
+ */
+ private final Iterable<Builder> inheritedSub;
+ /**
+ * Inherited from immediate parent
+ */
+ private final Iterable<Builder> inheritedOne;
+ private final YangInstanceIdentifier nodeId;
+ private final Collection<Node> nodes;
+
+ private final Map<DataChangeListenerRegistration<?>, Builder> subBuilders = new HashMap<>();
+ private final Map<DataChangeListenerRegistration<?>, Builder> oneBuilders = new HashMap<>();
+ private final Map<DataChangeListenerRegistration<?>, Builder> baseBuilders = new HashMap<>();
+
+ private ResolveDataChangeState(final YangInstanceIdentifier nodeId,
+ final Iterable<Builder> inheritedSub, final Iterable<Builder> inheritedOne,
+ final Collection<Node> nodes) {
+ this.nodeId = Preconditions.checkNotNull(nodeId);
+ this.nodes = Preconditions.checkNotNull(nodes);
+ this.inheritedSub = Preconditions.checkNotNull(inheritedSub);
+ this.inheritedOne = Preconditions.checkNotNull(inheritedOne);
+
+ /*
+ * Collect the nodes which need to be propagated from us to the child.
+ */
+ for (Node n : nodes) {
+ for (DataChangeListenerRegistration<?> l : n.getListeners()) {
+ final Builder b = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE);
+ switch (l.getScope()) {
+ case BASE:
+ baseBuilders.put(l, b);
+ break;
+ case ONE:
+ oneBuilders.put(l, b);
+ break;
+ case SUBTREE:
+ subBuilders.put(l, b);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Create an initial state handle at a particular root node.
+ *
+ * @param rootId root instance identifier
+ * @param root root node
+ * @return
+ */
+ public static ResolveDataChangeState initial(final YangInstanceIdentifier rootId, final Node root) {
+ return new ResolveDataChangeState(rootId, Collections.<Builder>emptyList(),
+ Collections.<Builder>emptyList(), Collections.singletonList(root));
+ }
+
+ /**
+ * Create a state handle for iterating over a particular child.
+ *
+ * @param childId ID of the child
+ * @return State handle
+ */
+ public ResolveDataChangeState child(final PathArgument childId) {
+ return new ResolveDataChangeState(nodeId.node(childId),
+ Iterables.concat(inheritedSub, subBuilders.values()),
+ oneBuilders.values(), getListenerChildrenWildcarded(nodes, childId));
+ }
+
+ /**
+ * Get the current path
+ *
+ * @return Current path.
+ */
+ public YangInstanceIdentifier getPath() {
+ return nodeId;
+ }
+
+ /**
+ * Check if this child needs processing.
+ *
+ * @return True if processing needs to occur, false otherwise.
+ */
+ public boolean needsProcessing() {
+ // May have underlying listeners, so we need to process
+ if (!nodes.isEmpty()) {
+ return true;
+ }
+ // Have SUBTREE listeners
+ if (!Iterables.isEmpty(inheritedSub)) {
+ return true;
+ }
+ // Have ONE listeners
+ if (!Iterables.isEmpty(inheritedOne)) {
+ return true;
+ }
+
+ // FIXME: do we need anything else? If not, flip this to 'false'
+ return true;
+ }
+
+ /**
+ * Add an event to all current listeners.
+ *
+ * @param event
+ */
+ public void addEvent(final DOMImmutableDataChangeEvent event) {
+ // Subtree builders get always notified
+ for (Builder b : subBuilders.values()) {
+ b.merge(event);
+ }
+ for (Builder b : inheritedSub) {
+ b.merge(event);
+ }
+
+ if (event.getScope() == DataChangeScope.ONE || event.getScope() == DataChangeScope.BASE) {
+ for (Builder b : oneBuilders.values()) {
+ b.merge(event);
+ }
+ }
+
+ if (event.getScope() == DataChangeScope.BASE) {
+ for (Builder b : inheritedOne) {
+ b.merge(event);
+ }
+ for (Builder b : baseBuilders.values()) {
+ b.merge(event);
+ }
+ }
+ }
+
+ /**
+ * Gather all non-empty events into the provided map.
+ *
+ * @param before before-image
+ * @param after after-image
+ * @param map target map
+ */
+ public void collectEvents(final NormalizedNode<?, ?> before, final NormalizedNode<?, ?> after,
+ final Multimap<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> map) {
+ for (Entry<DataChangeListenerRegistration<?>, Builder> e : baseBuilders.entrySet()) {
+ final Builder b = e.getValue();
+ if (!b.isEmpty()) {
+ map.put(e.getKey(), b.setBefore(before).setAfter(after).build());
+ }
+ }
+ for (Entry<DataChangeListenerRegistration<?>, Builder> e : oneBuilders.entrySet()) {
+ final Builder b = e.getValue();
+ if (!b.isEmpty()) {
+ map.put(e.getKey(), b.setBefore(before).setAfter(after).build());
+ }
+ }
+ for (Entry<DataChangeListenerRegistration<?>, Builder> e : subBuilders.entrySet()) {
+ final Builder b = e.getValue();
+ if (!b.isEmpty()) {
+ map.put(e.getKey(), b.setBefore(before).setAfter(after).build());
+ }
+ }
+
+ LOG.trace("Collected events {}", map);
+ }
+
+ private static Collection<Node> getListenerChildrenWildcarded(final Collection<Node> parentNodes,
+ final PathArgument child) {
+ if (parentNodes.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<Node> result = new ArrayList<>();
+ if (child instanceof NodeWithValue || child instanceof NodeIdentifierWithPredicates) {
+ NodeIdentifier wildcardedIdentifier = new NodeIdentifier(child.getNodeType());
+ addChildNodes(result, parentNodes, wildcardedIdentifier);
+ }
+ addChildNodes(result, parentNodes, child);
+ return result;
+ }
+
+ private static void addChildNodes(final List<Node> result, final Collection<Node> parentNodes, final PathArgument childIdentifier) {
+ for (Node node : parentNodes) {
+ Optional<Node> child = node.getChild(childIdentifier);
+ if (child.isPresent()) {
+ result.add(child.get());
+ }
+ }
+ }
+}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
+import com.google.common.util.concurrent.MoreExecutors;
+
import java.util.Collection;
import java.util.Map;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.util.concurrent.MoreExecutors;
-
public abstract class AbstractDataChangeListenerTest {
protected static final YangInstanceIdentifier TOP_LEVEL = YangInstanceIdentifier
}
}
+ /**
+ * Create a new test task. The task will operate on the backed database,
+ * and will use the proper background executor service.
+ *
+ * @return Test task initialized to clean up {@value #TOP_LEVEL} and its
+ * children.
+ */
public final DatastoreTestTask newTestTask() {
return new DatastoreTestTask(datastore, dclExecutorService).cleanup(DatastoreTestTask
.simpleDelete(TOP_LEVEL));
import org.opendaylight.controller.md.sal.dom.store.impl.DatastoreTestTask.WriteTransactionCustomizer;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+/**
+ * Base template for a test suite for testing DataChangeListener functionality.
+ */
public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataChangeListenerTest {
protected static final String FOO_SIBLING = "foo-sibling";
+ /**
+ * Callback invoked when the test suite can modify task parameters.
+ *
+ * @param task Update task configuration as needed
+ */
abstract protected void customizeTask(DatastoreTestTask task);
@Test
assertNotNull(change);
- assertNotContains(change.getCreatedData(), TOP_LEVEL);
- assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR));
+ /*
+ * Created data must not contain nested-list item, since that is two-level deep.
+ */
+ assertNotContains(change.getCreatedData(), TOP_LEVEL,path(FOO, BAR));
+ assertContains(change.getCreatedData(), path(FOO) );
assertEmpty(change.getUpdatedData());
assertEmpty(change.getRemovedPaths());
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
-
- assertContains(change.getCreatedData(), path(FOO, BAZ));
+ /*
+ * Created data must NOT contain nested-list item since scope is base, and change is two
+ * level deep.
+ */
+ assertNotContains(change.getCreatedData(), path(FOO, BAZ));
assertContains(change.getUpdatedData(), path(FOO));
assertNotContains(change.getUpdatedData(), TOP_LEVEL);
- assertContains(change.getRemovedPaths(), path(FOO, BAR));
+ /*
+ * Removed data must NOT contain nested-list item since scope is base, and change is two
+ * level deep.
+ */
+ assertNotContains(change.getRemovedPaths(), path(FOO, BAR));
}
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
- assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR), path(FOO, BAZ));
- assertNotContains(change.getCreatedData(), TOP_LEVEL);
+ // Base event should contain only changed item, no details about child.
+ assertContains(change.getCreatedData(), path(FOO));
+ assertNotContains(change.getCreatedData(), TOP_LEVEL,path(FOO, BAR), path(FOO, BAZ));
assertEmpty(change.getUpdatedData());
assertEmpty(change.getRemovedPaths());
assertEmpty(change.getUpdatedData());
assertNotContains(change.getUpdatedData(), TOP_LEVEL);
- assertContains(change.getRemovedPaths(), path(FOO),path(FOO, BAZ),path(FOO,BAR));
+ /*
+ * Scope base listener event should contain top-level-list item and nested list path
+ * and should not contain baz, bar which are two-level deep
+ */
+ assertContains(change.getRemovedPaths(), path(FOO));
+ assertNotContains(change.getRemovedPaths(),path(FOO, BAZ),path(FOO,BAR));
}
@Override
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.top.level.list.NestedList;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
assertNotNull(change);
- assertNotContains(change.getCreatedData(), TOP_LEVEL);
- assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR));
+ assertNotContains(change.getCreatedData(), TOP_LEVEL,path(FOO, BAR));
+ assertContains(change.getCreatedData(), path(FOO), path(FOO).node(NestedList.QNAME));
assertEmpty(change.getUpdatedData());
assertEmpty(change.getRemovedPaths());
AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
assertNotNull(change);
-
- assertContains(change.getCreatedData(), path(FOO, BAZ));
- assertContains(change.getUpdatedData(), path(FOO));
+ /*
+ * Created data must NOT contain nested-list item since scope is base, and change is two
+ * level deep.
+ */
+ assertNotContains(change.getCreatedData(), path(FOO, BAZ));
+ assertContains(change.getUpdatedData(), path(FOO),path(FOO).node(NestedList.QNAME));
assertNotContains(change.getUpdatedData(), TOP_LEVEL);
- assertContains(change.getRemovedPaths(), path(FOO, BAR));
+ /*
+ * Removed data must NOT contain nested-list item since scope is base, and change is two
+ * level deep.
+ */
+ assertNotContains(change.getRemovedPaths(), path(FOO, BAR));
}
assertNotNull(change);
assertFalse(change.getCreatedData().isEmpty());
- assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR), path(FOO, BAZ));
- assertNotContains(change.getCreatedData(), TOP_LEVEL);
+ // Base event should contain only changed item, and details about immediate child.
+ assertContains(change.getCreatedData(), path(FOO),path(FOO).node(NestedList.QNAME));
+ assertNotContains(change.getCreatedData(), TOP_LEVEL,path(FOO, BAR), path(FOO, BAZ));
assertEmpty(change.getUpdatedData());
assertEmpty(change.getRemovedPaths());
assertEmpty(change.getUpdatedData());
assertNotContains(change.getUpdatedData(), TOP_LEVEL);
- assertContains(change.getRemovedPaths(), path(FOO),path(FOO, BAZ),path(FOO,BAR));
+ assertContains(change.getRemovedPaths(), path(FOO),path(FOO).node(NestedList.QNAME));
+ assertNotContains(change.getRemovedPaths(), path(FOO, BAZ),path(FOO,BAR));
}
@Override
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
-
import com.google.common.collect.Sets;
+
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
}
// FIXME: do we really want to continue here?
- moduleBasedCaps.add(QName.create(namespace, revision, moduleName));
+ moduleBasedCaps.add(QName.cachedReference(QName.create(namespace, revision, moduleName)));
nonModuleCaps.remove(capability);
}
public RpcRegistry() {
bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
+
+ log.info("Bucket store path = {}", bucketStore.path().toString());
}
public RpcRegistry(ActorRef bucketStore) {
import akka.cluster.Cluster;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import org.opendaylight.controller.utils.ConditionalProbe;
import java.util.HashMap;
import java.util.Map;
*/
private ActorRef gossiper;
+ private ConditionalProbe probe;
+
public BucketStore(){
gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
}
@Override
public void onReceive(Object message) throws Exception {
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+ log.debug("Received message: node[{}], message[{}]", selfAddress,
+ message);
- if (message instanceof UpdateBucket)
- receiveUpdateBucket(((UpdateBucket) message).getBucket());
+ if (probe != null) {
- else if (message instanceof GetAllBuckets)
- receiveGetAllBucket();
+ probe.tell(message, getSelf());
+ }
- else if (message instanceof GetLocalBucket)
+ if (message instanceof ConditionalProbe) {
+ log.info("Received probe {} {}", getSelf(), message);
+ probe = (ConditionalProbe) message;
+ } else if (message instanceof UpdateBucket) {
+ receiveUpdateBucket(((UpdateBucket) message).getBucket());
+ } else if (message instanceof GetAllBuckets) {
+ receiveGetAllBucket();
+ } else if (message instanceof GetLocalBucket) {
receiveGetLocalBucket();
-
- else if (message instanceof GetBucketsByMembers)
- receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
-
- else if (message instanceof GetBucketVersions)
+ } else if (message instanceof GetBucketsByMembers) {
+ receiveGetBucketsByMembers(
+ ((GetBucketsByMembers) message).getMembers());
+ } else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
-
- else if (message instanceof UpdateRemoteBuckets)
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
-
- else {
+ } else if (message instanceof UpdateRemoteBuckets) {
+ receiveUpdateRemoteBuckets(
+ ((UpdateRemoteBuckets) message).getBuckets());
+ } else {
log.debug("Unhandled message [{}]", message);
unhandled(message);
}
Address getSelfAddress() {
return selfAddress;
}
+
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.utils;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConditionalProbe {
+ private final ActorRef actorRef;
+ private final Predicate predicate;
+ Logger log = LoggerFactory.getLogger(ConditionalProbe.class);
+
+ public ConditionalProbe(ActorRef actorRef, Predicate predicate) {
+ this.actorRef = actorRef;
+ this.predicate = predicate;
+ }
+
+ public void tell(Object message, ActorRef sender){
+ if(predicate.apply(message)) {
+ log.info("sending message to probe {}", message);
+ actorRef.tell(message, sender);
+ }
+ }
+}
package org.opendaylight.controller.remote.rpc.registry;
+
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.ChildActorPath;
import akka.actor.Props;
-import akka.japi.Pair;
import akka.testkit.JavaTestKit;
+import com.google.common.base.Predicate;
import com.typesafe.config.ConfigFactory;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.utils.ConditionalProbe;
import org.opendaylight.yangtools.yang.common.QName;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nullable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
public class RpcRegistryTest {
- private static ActorSystem node1;
- private static ActorSystem node2;
- private static ActorSystem node3;
-
- private ActorRef registry1;
- private ActorRef registry2;
- private ActorRef registry3;
-
- @BeforeClass
- public static void setup() throws InterruptedException {
- Thread.sleep(1000); //give some time for previous test to close netty ports
- node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
- node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
- node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
- }
-
- @AfterClass
- public static void teardown(){
- JavaTestKit.shutdownActorSystem(node1);
- JavaTestKit.shutdownActorSystem(node2);
- JavaTestKit.shutdownActorSystem(node3);
- if (node1 != null)
- node1.shutdown();
- if (node2 != null)
- node2.shutdown();
- if (node3 != null)
- node3.shutdown();
-
- }
-
- @Before
- public void createRpcRegistry() throws InterruptedException {
- registry1 = node1.actorOf(Props.create(RpcRegistry.class));
- registry2 = node2.actorOf(Props.create(RpcRegistry.class));
- registry3 = node3.actorOf(Props.create(RpcRegistry.class));
- }
-
- @After
- public void stopRpcRegistry() throws InterruptedException {
- if (registry1 != null)
- node1.stop(registry1);
- if (registry2 != null)
- node2.stop(registry2);
- if (registry3 != null)
- node3.stop(registry3);
- }
+ private static ActorSystem node1;
+ private static ActorSystem node2;
+ private static ActorSystem node3;
+
+ private ActorRef registry1;
+ private ActorRef registry2;
+ private ActorRef registry3;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
+ node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
+ node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(node1);
+ JavaTestKit.shutdownActorSystem(node2);
+ JavaTestKit.shutdownActorSystem(node3);
+ if (node1 != null)
+ node1.shutdown();
+ if (node2 != null)
+ node2.shutdown();
+ if (node3 != null)
+ node3.shutdown();
+
+ }
+
+ @Before
+ public void createRpcRegistry() throws InterruptedException {
+ registry1 = node1.actorOf(Props.create(RpcRegistry.class));
+ registry2 = node2.actorOf(Props.create(RpcRegistry.class));
+ registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+ }
+
+ @After
+ public void stopRpcRegistry() throws InterruptedException {
+ if (registry1 != null)
+ node1.stop(registry1);
+ if (registry2 != null)
+ node2.stop(registry2);
+ if (registry3 != null)
+ node3.stop(registry3);
+ }
+
+ /**
+ * One node cluster.
+ * 1. Register rpc, ensure router can be found
+ * 2. Then remove rpc, ensure its deleted
+ *
+ * @throws URISyntaxException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
+ validateSystemStartup();
+
+ final JavaTestKit mockBroker = new JavaTestKit(node1);
+
+ final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+
+ //install probe
+ final JavaTestKit probe1 = createProbeForMessage(
+ node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
+
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+
+ //Bucket store should get an update bucket message. Updated bucket contains added rpc.
+ probe1.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateBucket.class);
+
+ //Now remove rpc
+ registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+
+ //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
+ probe1.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateBucket.class);
+
+
+ }
+
+
+ /**
+ * Three node cluster.
+ * 1. Register rpc on 1 node, ensure 2nd node gets updated
+ * 2. Remove rpc on 1 node, ensure 2nd node gets updated
+ *
+ * @throws URISyntaxException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
- /**
- * One node cluster.
- * 1. Register rpc, ensure router can be found
- * 2. Then remove rpc, ensure its deleted
- *
- * @throws URISyntaxException
- * @throws InterruptedException
- */
- @Test
- public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
-
- final JavaTestKit mockBroker = new JavaTestKit(node1);
-
- //Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker.getRef());
-
- Thread.sleep(1000);//
-
- //find the route on node 1's registry
- registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
- FindRoutersReply message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
- List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
-
- validateRouterReceived(pairs, mockBroker.getRef());
-
- //Now remove rpc
- registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
- Thread.sleep(1000);
- //find the route on node 1's registry
- registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
- message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
- pairs = message.getRouterWithUpdateTime();
-
- Assert.assertTrue(pairs.isEmpty());
- }
+ validateSystemStartup();
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+ //install probe on node2's bucket store
+ final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
+ final JavaTestKit probe2 = createProbeForMessage(
+ node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- /**
- * Three node cluster.
- * 1. Register rpc on 1 node, ensure its router can be found on other 2.
- * 2. Remove rpc on 1 node, ensure its removed on other 2.
- *
- * @throws URISyntaxException
- * @throws InterruptedException
- */
- @Test
- public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
- validateSystemStartup();
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
- final JavaTestKit mockBroker2 = new JavaTestKit(node2);
- final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+ //Bucket store on node2 should get a message to update its local copy of remote buckets
+ probe2.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- //Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+ //Now remove
+ registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
- Thread.sleep(1000);// give some time for bucket store data sync
+ //Bucket store on node2 should get a message to update its local copy of remote buckets
+ probe2.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- //find the route in node 2's registry
- List<Pair<ActorRef, Long>> pairs = findRouters(registry2, mockBroker2);
- validateRouterReceived(pairs, mockBroker1.getRef());
+ }
- //find the route in node 3's registry
- pairs = findRouters(registry3, mockBroker3);
- validateRouterReceived(pairs, mockBroker1.getRef());
+ /**
+ * Three node cluster.
+ * Register rpc on 2 nodes. Ensure 3rd gets updated.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRpcAddedOnMultiNodes() throws Exception {
- //Now remove
- registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
- Thread.sleep(1000);// give some time for bucket store data sync
+ validateSystemStartup();
- pairs = findRouters(registry2, mockBroker2);
- Assert.assertTrue(pairs.isEmpty());
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+ final JavaTestKit mockBroker3 = new JavaTestKit(node3);
- pairs = findRouters(registry3, mockBroker3);
- Assert.assertTrue(pairs.isEmpty());
- }
+ registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
- /**
- * Three node cluster.
- * Register rpc on 2 nodes. Ensure 2 routers are found on 3rd.
- *
- * @throws Exception
- */
- @Test
- public void testAnRpcAddedOnMultiNodesShouldReturnMultiRouter() throws Exception {
+ //install probe on node 3
+ final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
+ final JavaTestKit probe3 = createProbeForMessage(
+ node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- validateSystemStartup();
- final JavaTestKit mockBroker1 = new JavaTestKit(node1);
- final JavaTestKit mockBroker2 = new JavaTestKit(node2);
- final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
- //Thread.sleep(5000);//let system come up
+ probe3.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
- //Add rpc on node 1
- registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
- registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
- //Add same rpc on node 2
- registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
- registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+ //Add same rpc on node 2
+ registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+ registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
- registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
- Thread.sleep(1000);// give some time for bucket store data sync
+ probe3.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ }
- //find the route in node 3's registry
- registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
- FindRoutersReply message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
- List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
+ private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) {
+ final JavaTestKit probe = new JavaTestKit(node);
- validateMultiRouterReceived(pairs, mockBroker1.getRef(), mockBroker2.getRef());
+ ConditionalProbe conditionalProbe =
+ new ConditionalProbe(probe.getRef(), new Predicate() {
+ @Override
+ public boolean apply(@Nullable Object input) {
+ return clazz.equals(input.getClass());
+ }
+ });
- }
+ ActorSelection subject = node.actorSelection(subjectPath);
+ subject.tell(conditionalProbe, ActorRef.noSender());
- private List<Pair<ActorRef, Long>> findRouters(ActorRef registry, JavaTestKit receivingActor) throws URISyntaxException {
- registry.tell(new FindRouters(createRouteId()), receivingActor.getRef());
- FindRoutersReply message = receivingActor.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
- return message.getRouterWithUpdateTime();
- }
+ return probe;
- private void validateMultiRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef... expected) {
- Assert.assertTrue(actual != null);
- Assert.assertTrue(actual.size() == expected.length);
- }
+ }
- private void validateRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef expected){
- Assert.assertTrue(actual != null);
- Assert.assertTrue(actual.size() == 1);
+ private void validateSystemStartup() throws InterruptedException {
- for (Pair<ActorRef, Long> pair : actual){
- Assert.assertTrue(expected.path().uid() == pair.first().path().uid());
- }
- }
+ ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
+ ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
+ ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
- private void validateSystemStartup() throws InterruptedException {
+ ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
+ ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
+ ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
- Thread.sleep(5000);
- ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
- ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
- ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
- ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
- ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
- ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
+ if (!resolveReference(gossiper1, gossiper2, gossiper3))
+ Assert.fail("Could not find gossipers");
+ }
+ private Boolean resolveReference(ActorSelection... gossipers) {
- if (!resolveReference(gossiper1, gossiper2, gossiper3))
- Assert.fail("Could not find gossipers");
- }
+ Boolean resolved = true;
+ for (int i = 0; i < 5; i++) {
- private Boolean resolveReference(ActorSelection... gossipers) throws InterruptedException {
+ resolved = true;
+ System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i);
- Boolean resolved = true;
+ for (ActorSelection gossiper : gossipers) {
+ ActorRef ref = null;
- for (int i=0; i< 5; i++) {
- Thread.sleep(1000);
- for (ActorSelection gossiper : gossipers) {
- Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+ try {
+ Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS));
+ ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage());
+ }
- ActorRef ref = null;
- try {
- ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
- } catch (Exception e) {
- e.printStackTrace();
- }
+ if (ref == null)
+ resolved = false;
+ }
- if (ref == null)
- resolved = false;
- }
+ if (resolved) break;
- if (resolved) break;
- }
- return resolved;
}
+ return resolved;
+ }
- private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
- return new AddOrUpdateRoutes(createRouteIds());
- }
+ private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
+ return new AddOrUpdateRoutes(createRouteIds());
+ }
- private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
- return new RemoveRoutes(createRouteIds());
- }
+ private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
+ return new RemoveRoutes(createRouteIds());
+ }
- private List<RpcRouter.RouteIdentifier<?,?,?>> createRouteIds() throws URISyntaxException {
- QName type = new QName(new URI("/mockrpc"), "mockrpc");
- List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
- routeIds.add(new RouteIdentifierImpl(null, type, null));
- return routeIds;
- }
+ private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
+ QName type = new QName(new URI("/mockrpc"), "mockrpc");
+ List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
+ routeIds.add(new RouteIdentifierImpl(null, type, null));
+ return routeIds;
+ }
- private RpcRouter.RouteIdentifier<?,?,?> createRouteId() throws URISyntaxException {
- QName type = new QName(new URI("/mockrpc"), "mockrpc");
- return new RouteIdentifierImpl(null, type, null);
- }
-}
\ No newline at end of file
+}
odl-cluster{
akka {
- loglevel = "INFO"
+ loglevel = "DEBUG"
#log-config-on-start = on
actor {
loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
+ debug {
+ #lifecycle = on
+ }
}
remote {
log-received-messages = off
--- /dev/null
+<configuration scan="true">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <root level="debug">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
package org.opendaylight.controller.netconf.client;
import io.netty.channel.Channel;
+
import java.util.Collection;
+
import org.opendaylight.controller.netconf.nettyutil.AbstractNetconfSession;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
private static final Logger logger = LoggerFactory.getLogger(NetconfClientSession.class);
private final Collection<String> capabilities;
- public NetconfClientSession(NetconfClientSessionListener sessionListener, Channel channel, long sessionId,
- Collection<String> capabilities) {
+ /**
+ * Construct a new session.
+ *
+ * @param sessionListener
+ * @param channel
+ * @param sessionId
+ * @param capabilities set of advertised capabilities. Expected to be immutable.
+ */
+ public NetconfClientSession(final NetconfClientSessionListener sessionListener, final Channel channel, final long sessionId,
+ final Collection<String> capabilities) {
super(sessionListener, channel, sessionId);
this.capabilities = capabilities;
logger.debug("Client Session {} created", toString());
}
@Override
- protected void addExiHandlers(NetconfEXICodec exiCodec) {
+ protected void addExiHandlers(final NetconfEXICodec exiCodec) {
// TODO used only in negotiator, client supports only auto start-exi
replaceMessageDecoder(new NetconfEXIToMessageDecoder(exiCodec));
replaceMessageEncoder(new NetconfMessageToEXIEncoder(exiCodec));
package org.opendaylight.controller.netconf.client;
+import com.google.common.collect.ImmutableList;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
private static final String EXI_1_0_CAPABILITY_MARKER = "exi:1.0";
- protected NetconfClientSessionNegotiator(NetconfClientSessionPreferences sessionPreferences,
- Promise<NetconfClientSession> promise,
- Channel channel,
- Timer timer,
- NetconfClientSessionListener sessionListener,
- long connectionTimeoutMillis) {
+ protected NetconfClientSessionNegotiator(final NetconfClientSessionPreferences sessionPreferences,
+ final Promise<NetconfClientSession> promise,
+ final Channel channel,
+ final Timer timer,
+ final NetconfClientSessionListener sessionListener,
+ final long connectionTimeoutMillis) {
super(sessionPreferences, promise, channel, timer, sessionListener, connectionTimeoutMillis);
}
@Override
- protected void handleMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
+ protected void handleMessage(final NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
final NetconfClientSession session = getSessionForHelloMessage(netconfMessage);
replaceHelloMessageInboundHandler(session);
});
}
- private boolean shouldUseExi(NetconfHelloMessage helloMsg) {
+ private boolean shouldUseExi(final NetconfHelloMessage helloMsg) {
return containsExi10Capability(helloMsg.getDocument())
&& containsExi10Capability(sessionPreferences.getHelloMessage().getDocument());
}
return false;
}
- private long extractSessionId(Document doc) {
+ private long extractSessionId(final Document doc) {
final Node sessionIdNode = (Node) XmlUtil.evaluateXPath(sessionIdXPath, doc, XPathConstants.NODE);
String textContent = sessionIdNode.getTextContent();
if (textContent == null || textContent.equals("")) {
}
@Override
- protected NetconfClientSession getSession(NetconfClientSessionListener sessionListener, Channel channel,
- NetconfHelloMessage message) throws NetconfDocumentedException {
+ protected NetconfClientSession getSession(final NetconfClientSessionListener sessionListener, final Channel channel,
+ final NetconfHelloMessage message) throws NetconfDocumentedException {
long sessionId = extractSessionId(message.getDocument());
- Collection<String> capabilities = NetconfMessageUtil.extractCapabilitiesFromHello(message.getDocument());
+
+ // Copy here is important: it disconnects the strings from the document
+ Collection<String> capabilities = ImmutableList.copyOf(NetconfMessageUtil.extractCapabilitiesFromHello(message.getDocument()));
+
+ // FIXME: scalability: we could instantiate a cache to share the same collections
return new NetconfClientSession(sessionListener, channel, sessionId, capabilities);
}
private static final String EXI_CONFIRMED_HANDLER = "exiConfirmedHandler";
private final NetconfClientSession session;
- private NetconfStartExiMessage startExiMessage;
+ private final NetconfStartExiMessage startExiMessage;
- ExiConfirmationInboundHandler(NetconfClientSession session, final NetconfStartExiMessage startExiMessage) {
+ ExiConfirmationInboundHandler(final NetconfClientSession session, final NetconfStartExiMessage startExiMessage) {
this.session = session;
this.startExiMessage = startExiMessage;
}
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
ctx.pipeline().remove(ExiConfirmationInboundHandler.EXI_CONFIRMED_HANDLER);
NetconfMessage netconfMessage = (NetconfMessage) msg;
private OutputStream stdIn;
- private Queue<ByteBuf> postponed = new LinkedList<>();
+ private final Queue<ByteBuf> postponed = new LinkedList<>();
private ChannelHandlerContext ctx;
private ChannelPromise disconnectPromise;
private final Object lock = new Object();
- public SshClientAdapter(SshClient sshClient, Invoker invoker) {
+ public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
this.sshClient = sshClient;
this.invoker = invoker;
}
- // TODO: refactor
+ // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
+ // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
public void run() {
try {
- SshSession session = sshClient.openSession();
+ final SshSession session = sshClient.openSession();
invoker.invoke(session);
- InputStream stdOut = session.getStdout();
- session.getStderr();
+ final InputStream stdOut = session.getStdout();
synchronized (lock) {
-
stdIn = session.getStdin();
- ByteBuf message;
- while ((message = postponed.poll()) != null) {
- writeImpl(message);
+ while (postponed.peek() != null) {
+ writeImpl(postponed.poll());
}
}
while (!stopRequested.get()) {
- byte[] readBuff = new byte[BUFFER_SIZE];
- int c = stdOut.read(readBuff);
+ final byte[] readBuff = new byte[BUFFER_SIZE];
+ final int c = stdOut.read(readBuff);
if (c == -1) {
continue;
}
- byte[] tranBuff = new byte[c];
- System.arraycopy(readBuff, 0, tranBuff, 0, c);
- ByteBuf byteBuf = Unpooled.buffer(c);
- byteBuf.writeBytes(tranBuff);
- ctx.fireChannelRead(byteBuf);
+ ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("Unexpected exception", e);
} finally {
sshClient.close();
}
// TODO: needs rework to match netconf framer API.
- public void write(ByteBuf message) throws IOException {
+ public void write(final ByteBuf message) throws IOException {
synchronized (lock) {
if (stdIn == null) {
postponed.add(message);
}
}
- private void writeImpl(ByteBuf message) throws IOException {
+ private void writeImpl(final ByteBuf message) throws IOException {
message.getBytes(0, stdIn, message.readableBytes());
message.release();
stdIn.flush();
}
- public void stop(ChannelPromise promise) {
+ public void stop(final ChannelPromise promise) {
synchronized (lock) {
stopRequested.set(true);
disconnectPromise = promise;
}
}
- public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
+ public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
checkArgument(channelFuture.isSuccess());
checkNotNull(ctx.channel().remoteAddress());
synchronized (this) {
checkState(this.ctx == null);
this.ctx = ctx;
}
- String threadName = toString();
- Thread thread = new Thread(this, threadName);
+ final String threadName = toString();
+ final Thread thread = new Thread(this, threadName);
thread.start();
return thread;
}
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.StreamGobbler;
-
import ch.ethz.ssh2.channel.Channel;
import java.io.Closeable;
import java.io.IOException;
class SshSession implements Closeable {
private final Session session;
- public SshSession(Session session) {
+ public SshSession(final Session session) {
this.session = session;
}
-
- public void startSubSystem(String name) throws IOException {
+ public void startSubSystem(final String name) throws IOException {
session.startSubSystem(name);
}
public InputStream getStdout() {
- return new StreamGobbler(session.getStdout());
+ return session.getStdout();
}
+ // FIXME according to http://www.ganymed.ethz.ch/ssh2/FAQ.html#blocking you should read data from both stdout and stderr to prevent window filling up (since stdout and stderr share a window)
+ // FIXME stdErr is not used anywhere
public InputStream getStderr() {
return session.getStderr();
}
ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler);
// get channel
final Channel channel = clientChannelFuture.awaitUninterruptibly().channel();
+
+ // write additional header before polling thread is started
+ // polling thread could process and forward data before additional header is written
+ // This will result into unexpected state: hello message without additional header and the next message with additional header
+ channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
+
new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() {
@Override
public void close() throws Exception {
}
}
}, sshClientHandler.getChannelHandlerContext()).start();
-
- // write additional header
- channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
} else {
logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem);
String reason = "Only netconf subsystem is supported, requested:" + subsystem;
import java.io.InputStreamReader;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.annotation.Arg;
import net.sourceforge.argparse4j.inf.ArgumentParser;
@Arg(dest = "starting-port")
public int startingPort;
+ @Arg(dest = "generate-config-connection-timeout")
+ public int generateConfigsTimeout;
+
@Arg(dest = "generate-configs-dir")
public File generateConfigsDir;
@Arg(dest = "ssh")
public boolean ssh;
+ @Arg(dest = "exi")
+ public boolean exi;
+
static ArgumentParser getParser() {
final ArgumentParser parser = ArgumentParsers.newArgumentParser("netconf testool");
parser.addArgument("--devices-count")
.help("First port for simulated device. Each other device will have previous+1 port number")
.dest("starting-port");
+ parser.addArgument("--generate-config-connection-timeout")
+ .type(Integer.class)
+ .setDefault((int)TimeUnit.MINUTES.toMillis(5))
+ .help("Timeout to be generated in initial config files")
+ .dest("generate-config-connection-timeout");
+
parser.addArgument("--generate-configs-batch-size")
.type(Integer.class)
.setDefault(100)
.help("Whether to use ssh for transport or just pure tcp")
.dest("ssh");
+ parser.addArgument("--exi")
+ .type(Boolean.class)
+ .setDefault(false)
+ .help("Whether to use exi to transport xml content")
+ .dest("exi");
+
return parser;
}
}
public static void main(final String[] args) {
+ ch.ethz.ssh2.log.Logger.enabled = true;
+
final Params params = parseArgs(args, Params.getParser());
params.validate();
try {
final List<Integer> openDevices = netconfDeviceSimulator.start(params);
if(params.generateConfigsDir != null) {
- new ConfigGenerator(params.generateConfigsDir, openDevices).generate(params.ssh, params.generateConfigBatchSize);
+ new ConfigGenerator(params.generateConfigsDir, openDevices).generate(params.ssh, params.generateConfigBatchSize, params.generateConfigsTimeout);
}
} catch (final Exception e) {
LOG.error("Unhandled exception", e);
this.openDevices = openDevices;
}
- public void generate(final boolean useSsh, final int batchSize) {
+ public void generate(final boolean useSsh, final int batchSize, final int generateConfigsTimeout) {
if(directory.exists() == false) {
checkState(directory.mkdirs(), "Unable to create folder %s" + directory);
}
configBlueprint = configBlueprint.replace(NETCONF_USE_SSH, "%s");
final String before = configBlueprint.substring(0, configBlueprint.indexOf("<module>"));
- final String middleBlueprint = configBlueprint.substring(configBlueprint.indexOf("<module>"), configBlueprint.indexOf("</module>") + "</module>".length());
+ final String middleBlueprint = configBlueprint.substring(configBlueprint.indexOf("<module>"), configBlueprint.indexOf("</module>"));
final String after = configBlueprint.substring(configBlueprint.indexOf("</module>") + "</module>".length());
int connectorCount = 0;
}
final String name = String.valueOf(openDevice) + SIM_DEVICE_SUFFIX;
- final String configContent = String.format(middleBlueprint, name, String.valueOf(openDevice), String.valueOf(!useSsh));
+ String configContent = String.format(middleBlueprint, name, String.valueOf(openDevice), String.valueOf(!useSsh));
+ configContent = String.format("%s%s%d%s\n%s\n", configContent, "<connection-timeout-millis>", generateConfigsTimeout, "</connection-timeout-millis>", "</module>");
+
b.append(configContent);
connectorCount++;
if(connectorCount == batchSize) {
import io.netty.util.HashedWheelTimer;
import java.io.Closeable;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.net.Inet4Address;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
+import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.controller.netconf.impl.DefaultCommitNotificationProducer;
import org.opendaylight.controller.netconf.impl.NetconfServerDispatcher;
import org.opendaylight.controller.netconf.impl.NetconfServerSessionNegotiatorFactory;
this.hashedWheelTimer = hashedWheelTimer;
}
- private NetconfServerDispatcher createDispatcher(final Map<ModuleBuilder, String> moduleBuilders) {
+ private NetconfServerDispatcher createDispatcher(final Map<ModuleBuilder, String> moduleBuilders, final boolean exi) {
final Set<Capability> capabilities = Sets.newHashSet(Collections2.transform(moduleBuilders.keySet(), new Function<ModuleBuilder, Capability>() {
@Override
final DefaultCommitNotificationProducer commitNotifier = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
+ final Set<String> serverCapabilities = exi
+ ? NetconfServerSessionNegotiatorFactory.DEFAULT_BASE_CAPABILITIES
+ : Sets.newHashSet(XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_0, XmlNetconfConstants.URN_IETF_PARAMS_NETCONF_BASE_1_1);
+
final NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
- hashedWheelTimer, simulatedOperationProvider, idProvider, CONNECTION_TIMEOUT_MILLIS, commitNotifier, new LoggingMonitoringService());
+ hashedWheelTimer, simulatedOperationProvider, idProvider, CONNECTION_TIMEOUT_MILLIS, commitNotifier, new LoggingMonitoringService(), serverCapabilities);
final NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(
serverNegotiatorFactory);
public List<Integer> start(final Main.Params params) {
final Map<ModuleBuilder, String> moduleBuilders = parseSchemasToModuleBuilders(params);
- final NetconfServerDispatcher dispatcher = createDispatcher(moduleBuilders);
+ final NetconfServerDispatcher dispatcher = createDispatcher(moduleBuilders, params.exi);
int currentPort = params.startingPort;