import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.junit.internal.AssumptionViolatedException;
import org.junit.runner.RunWith;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.mdsal.it.base.AbstractMdsalTestBase;
import org.opendaylight.ovsdb.lib.notation.Version;
-import org.opendaylight.ovsdb.schema.openvswitch.Queue;
import org.opendaylight.ovsdb.southbound.SouthboundConstants;
import org.opendaylight.ovsdb.southbound.SouthboundMapper;
import org.opendaylight.ovsdb.southbound.SouthboundUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbFailModeBase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbQueueRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbPortInterfaceAttributes.VlanMode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbQueueRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbTerminationPointAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbTerminationPointAugmentationBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.QosTypeBase;
new NotifyingDataChangeListener(LogicalDatastoreType.OPERATIONAL);
- private static class NotifyingDataChangeListener implements DataChangeListener {
+ private static class NotifyingDataChangeListener implements DataTreeChangeListener<DataObject> {
private final LogicalDatastoreType type;
private final Set<InstanceIdentifier<?>> createdIids = new HashSet<>();
private final Set<InstanceIdentifier<?>> removedIids = new HashSet<>();
}
@Override
- public void onDataChanged(
- AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> asyncDataChangeEvent) {
- LOG.info("{} DataChanged: created {}", type, asyncDataChangeEvent.getCreatedData().keySet());
- LOG.info("{} DataChanged: removed {}", type, asyncDataChangeEvent.getRemovedPaths());
- LOG.info("{} DataChanged: updated {}", type, asyncDataChangeEvent.getUpdatedData().keySet());
- createdIids.addAll(asyncDataChangeEvent.getCreatedData().keySet());
- removedIids.addAll(asyncDataChangeEvent.getRemovedPaths());
- updatedIids.addAll(asyncDataChangeEvent.getUpdatedData().keySet());
- // Handled managed iids
- for (DataObject obj : asyncDataChangeEvent.getCreatedData().values()) {
- if (obj instanceof ManagedNodeEntry) {
- ManagedNodeEntry managedNodeEntry = (ManagedNodeEntry) obj;
- LOG.info("{} DataChanged: created managed {}", managedNodeEntry.getBridgeRef().getValue());
- createdIids.add(managedNodeEntry.getBridgeRef().getValue());
+ public void onDataTreeChanged(Collection<DataTreeModification<DataObject>> changes) {
+ for (DataTreeModification<DataObject> change: changes) {
+ DataObjectModification<DataObject> rootNode = change.getRootNode();
+ final InstanceIdentifier<DataObject> identifier = change.getRootPath().getRootIdentifier();
+ switch (rootNode.getModificationType()) {
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ if (rootNode.getDataBefore() == null) {
+ LOG.info("{} DataTreeChanged: created {}", type, identifier);
+ createdIids.add(identifier);
+
+ final DataObject obj = rootNode.getDataAfter();
+ if (obj instanceof ManagedNodeEntry) {
+ ManagedNodeEntry managedNodeEntry = (ManagedNodeEntry) obj;
+ LOG.info("{} DataChanged: created managed {}",
+ managedNodeEntry.getBridgeRef().getValue());
+ createdIids.add(managedNodeEntry.getBridgeRef().getValue());
+ }
+ } else {
+ LOG.info("{} DataTreeChanged: updated {}", type, identifier);
+ updatedIids.add(identifier);
+ }
+ break;
+ case DELETE:
+ LOG.info("{} DataTreeChanged: removed {}", type, identifier);
+ removedIids.add(identifier);
+ break;
+ default:
+ break;
}
}
+
synchronized(this) {
notifyAll();
}
}
public void registerDataChangeListener() {
- dataBroker.registerDataChangeListener(type, iid, this, AsyncDataBroker.DataChangeScope.SUBTREE);
+ dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(type,
+ (InstanceIdentifier)iid), this);
}
public void waitForCreation(long timeout) throws InterruptedException {
synchronized (this) {
long _start = System.currentTimeMillis();
LOG.info("Waiting for {} DataChanged creation on {}", type, iid);
- while (!isCreated(iid) && (System.currentTimeMillis() - _start) < timeout) {
+ while (!isCreated(iid) && System.currentTimeMillis() - _start < timeout) {
wait(RETRY_WAIT);
}
- LOG.info("Woke up, waited {}ms for creation of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {}ms for creation of {}", System.currentTimeMillis() - _start, iid);
}
}
synchronized (this) {
long _start = System.currentTimeMillis();
LOG.info("Waiting for {} DataChanged deletion on {}", type, iid);
- while (!isRemoved(iid) && (System.currentTimeMillis() - _start) < timeout) {
+ while (!isRemoved(iid) && System.currentTimeMillis() - _start < timeout) {
wait(RETRY_WAIT);
}
- LOG.info("Woke up, waited {}ms for deletion of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {}ms for deletion of {}", System.currentTimeMillis() - _start, iid);
}
}
synchronized (this) {
long _start = System.currentTimeMillis();
LOG.info("Waiting for {} DataChanged update on {}", type, iid);
- while (!isUpdated(iid) && (System.currentTimeMillis() - _start) < timeout) {
+ while (!isUpdated(iid) && System.currentTimeMillis() - _start < timeout) {
wait(RETRY_WAIT);
}
- LOG.info("Woke up, waited {}ms for update of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {}ms for update of {}", System.currentTimeMillis() - _start, iid);
}
}
-
}
+ @Override
@Configuration
public Option[] config() {
Option[] options = super.config();
assertTrue("Did not find " + SouthboundUtils.OVSDB_TOPOLOGY_ID.getValue(), getOvsdbTopology());
final ConnectionInfo connectionInfo = getConnectionInfo(addressStr, portNumber);
final InstanceIdentifier<Node> iid = SouthboundUtils.createInstanceIdentifier(connectionInfo);
- dataBroker.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
- iid, CONFIGURATION_LISTENER, AsyncDataBroker.DataChangeScope.SUBTREE);
- dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
- iid, OPERATIONAL_LISTENER, AsyncDataBroker.DataChangeScope.SUBTREE);
+ dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+ (InstanceIdentifier)iid), CONFIGURATION_LISTENER);
+ dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+ (InstanceIdentifier)iid), OPERATIONAL_LISTENER);
ovsdbNode = connectOvsdbNode(connectionInfo);
OvsdbNodeAugmentation ovsdbNodeAugmentation = ovsdbNode.getAugmentation(OvsdbNodeAugmentation.class);
long _start = System.currentTimeMillis();
LOG.info("Waiting for OPERATIONAL DataChanged creation on {}", iid);
while (!OPERATIONAL_LISTENER.isCreated(
- iid) && (System.currentTimeMillis() - _start) < OVSDB_ROUNDTRIP_TIMEOUT) {
+ iid) && System.currentTimeMillis() - _start < OVSDB_ROUNDTRIP_TIMEOUT) {
OPERATIONAL_LISTENER.wait(OVSDB_UPDATE_TIMEOUT);
}
- LOG.info("Woke up, waited {} for creation of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {} for creation of {}", System.currentTimeMillis() - _start, iid);
}
}
long _start = System.currentTimeMillis();
LOG.info("Waiting for OPERATIONAL DataChanged deletion on {}", iid);
while (!OPERATIONAL_LISTENER.isRemoved(
- iid) && (System.currentTimeMillis() - _start) < OVSDB_ROUNDTRIP_TIMEOUT) {
+ iid) && System.currentTimeMillis() - _start < OVSDB_ROUNDTRIP_TIMEOUT) {
OPERATIONAL_LISTENER.wait(OVSDB_UPDATE_TIMEOUT);
}
- LOG.info("Woke up, waited {} for deletion of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {} for deletion of {}", System.currentTimeMillis() - _start, iid);
}
}
long _start = System.currentTimeMillis();
LOG.info("Waiting for OPERATIONAL DataChanged update on {}", iid);
while (!OPERATIONAL_LISTENER.isUpdated(
- iid) && (System.currentTimeMillis() - _start) < OVSDB_ROUNDTRIP_TIMEOUT) {
+ iid) && System.currentTimeMillis() - _start < OVSDB_ROUNDTRIP_TIMEOUT) {
OPERATIONAL_LISTENER.wait(OVSDB_UPDATE_TIMEOUT);
}
- LOG.info("Woke up, waited {} for update of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {} for update of {}", System.currentTimeMillis() - _start, iid);
}
}
private Queues getQueue(Uri queueId, OvsdbNodeAugmentation node) {
for (Queues queue : node.getQueues()) {
- if (queue.getKey().getQueueId().getValue().equals(queueId.getValue()))
+ if (queue.getKey().getQueueId().getValue().equals(queueId.getValue())) {
return queue;
+ }
}
return null;
}
private QosEntries getQos(Uri qosId, OvsdbNodeAugmentation node) {
for (QosEntries qos : node.getQosEntries()) {
- if (qos.getKey().getQosId().equals(qosId))
+ if (qos.getKey().getQosId().equals(qosId)) {
return qos;
+ }
}
return null;
}
@Override
protected void setKey(Builder<InterfaceLldp> builder, String key) {
- ((InterfaceLldpBuilder) builder).setLldpKey((key));
+ ((InterfaceLldpBuilder) builder).setLldpKey(key);
}
@Override
*/
package org.opendaylight.ovsdb.utils.mdsal.utils;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
/**
* This class provides methods for checking or waiting for various md-sal operations to complete.
* Once an instance is created one must invoke the registerDataChangeListener method
* with a DataBroker.
*/
-public class NotifyingDataChangeListener implements AutoCloseable, DataChangeListener {
+public class NotifyingDataChangeListener implements AutoCloseable, DataTreeChangeListener<DataObject> {
private static final Logger LOG = LoggerFactory.getLogger(NotifyingDataChangeListener.class);
private LogicalDatastoreType type;
private final Set<InstanceIdentifier<?>> createdIids = new HashSet<>();
}
@Override
- public void onDataChanged(
- AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> asyncDataChangeEvent) {
+ public void onDataTreeChanged(Collection<DataTreeModification<DataObject>> changes) {
if (!listen) {
return;
}
- if ((mask & BIT_CREATE) == BIT_CREATE) {
- LOG.info("{} DataChanged: created {}", type, asyncDataChangeEvent.getCreatedData().keySet());
- createdIids.addAll(asyncDataChangeEvent.getCreatedData().keySet());
- }
- if ((mask & BIT_UPDATE) == BIT_UPDATE) {
- LOG.info("{} DataChanged: updated {}", type, asyncDataChangeEvent.getUpdatedData().keySet());
- updatedIids.addAll(asyncDataChangeEvent.getUpdatedData().keySet());
- }
- if ((mask & BIT_DELETE) == BIT_DELETE) {
- LOG.info("{} DataChanged: removed {}", type, asyncDataChangeEvent.getRemovedPaths());
- removedIids.addAll(asyncDataChangeEvent.getRemovedPaths());
+
+ for (DataTreeModification<DataObject> change: changes) {
+ DataObjectModification<DataObject> rootNode = change.getRootNode();
+ final InstanceIdentifier<DataObject> identifier = change.getRootPath().getRootIdentifier();
+ switch (rootNode.getModificationType()) {
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ if (rootNode.getDataBefore() == null) {
+ if ((mask & BIT_CREATE) == BIT_CREATE) {
+ LOG.info("{} DataTreeChanged: created {}", type, identifier);
+ createdIids.add(identifier);
+ }
+ } else if ((mask & BIT_UPDATE) == BIT_UPDATE) {
+ LOG.info("{} DataTreeChanged: updated {}", type, identifier);
+ updatedIids.add(identifier);
+ }
+ break;
+ case DELETE:
+ if ((mask & BIT_DELETE) == BIT_DELETE) {
+ LOG.info("{} DataTreeChanged: removed {}", type, identifier);
+ removedIids.add(identifier);
+ }
+ break;
+ default:
+ break;
+ }
}
synchronized (this) {
removedIids.clear();
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public void registerDataChangeListener(DataBroker dataBroker) {
- listenerRegistration = dataBroker.registerDataChangeListener(type, iid, this,
- AsyncDataBroker.DataChangeScope.SUBTREE);
+ listenerRegistration = dataBroker.registerDataTreeChangeListener(new DataTreeIdentifier<>(type,
+ (InstanceIdentifier)iid), this);
}
public void waitForCreation() throws InterruptedException {
synchronized (this) {
long _start = System.currentTimeMillis();
LOG.info("Waiting for {} DataChanged creation on {}", type, iid);
- while (!isCreated(iid) && (System.currentTimeMillis() - _start) < timeout) {
+ while (!isCreated(iid) && System.currentTimeMillis() - _start < timeout) {
wait(RETRY_WAIT);
}
- LOG.info("Woke up, waited {}ms for creation of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {}ms for creation of {}", System.currentTimeMillis() - _start, iid);
}
}
synchronized (this) {
long _start = System.currentTimeMillis();
LOG.info("Waiting for {} DataChanged update on {}", type, iid);
- while (!isUpdated(iid) && (System.currentTimeMillis() - _start) < timeout) {
+ while (!isUpdated(iid) && System.currentTimeMillis() - _start < timeout) {
wait(RETRY_WAIT);
}
- LOG.info("Woke up, waited {}ms for update of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {}ms for update of {}", System.currentTimeMillis() - _start, iid);
}
}
synchronized (this) {
long _start = System.currentTimeMillis();
LOG.info("Waiting for {} DataChanged deletion on {}", type, iid);
- while (!isRemoved(iid) && (System.currentTimeMillis() - _start) < timeout) {
+ while (!isRemoved(iid) && System.currentTimeMillis() - _start < timeout) {
wait(RETRY_WAIT);
}
- LOG.info("Woke up, waited {}ms for deletion of {}", (System.currentTimeMillis() - _start), iid);
+ LOG.info("Woke up, waited {}ms for deletion of {}", System.currentTimeMillis() - _start, iid);
}
}