private ListenerRegistration<?> listenerRegistration;
private List<NotifyingDataChangeListener> waitList = null;
private int mdsalTimeout = MDSAL_TIMEOUT_OPERATIONAL;
+ private Boolean listen;
+ public static final int BIT_CREATE = 1;
+ public static final int BIT_UPDATE = 2;
+ public static final int BIT_DELETE = 4;
+ public static final int BIT_ALL = 7;
+ private int mask;
+
+ public NotifyingDataChangeListener(LogicalDatastoreType type, int mask,
+ InstanceIdentifier<?> iid, List<NotifyingDataChangeListener> waitList) {
+ this(type, iid, waitList);
+ this.mask = mask;
+ }
/**
* Create a new NotifyingDataChangeListener
- * @param type
+ * @param type DataStore type
* @param iid of the md-sal object we're waiting for
* @param waitList for tracking outstanding changes
*/
this.type = type;
this.iid = iid;
this.waitList = waitList;
- if(this.waitList != null) {
+ if (this.waitList != null) {
this.waitList.add(this);
}
if (type == LogicalDatastoreType.CONFIGURATION) {
mdsalTimeout = MDSAL_TIMEOUT_CONFIG;
}
+ listen = true;
+ mask = BIT_ALL;
}
/**
* Completely reset the state of this NotifyingDataChangeListener.
- * @param type
+ * @param type DataStore type
* @param iid of the md-sal object we're waiting for
* @throws Exception
*/
- private void modify(LogicalDatastoreType type, InstanceIdentifier<?> iid) throws Exception {
+ public void modify(LogicalDatastoreType type, InstanceIdentifier<?> iid) throws Exception {
this.close();
this.clear();
this.type = type;
this.iid = iid;
}
+ public void setlisten(Boolean listen) {
+ this.listen = listen;
+ }
+
+ public void setMask(int mask) {
+ this.mask = mask;
+ }
+
@Override
public void onDataChanged(
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> asyncDataChangeEvent) {
- LOG.info("{} DataChanged: created {}", type, asyncDataChangeEvent.getCreatedData().keySet());
- LOG.info("{} DataChanged: updated {}", type, asyncDataChangeEvent.getUpdatedData().keySet());
- LOG.info("{} DataChanged: removed {}", type, asyncDataChangeEvent.getRemovedPaths());
- createdIids.addAll(asyncDataChangeEvent.getCreatedData().keySet());
- removedIids.addAll(asyncDataChangeEvent.getRemovedPaths());
- updatedIids.addAll(asyncDataChangeEvent.getUpdatedData().keySet());
+ if (!listen) {
+ return;
+ }
+ if ((mask & BIT_CREATE) == BIT_CREATE) {
+ LOG.info("{} DataChanged: created {}", type, asyncDataChangeEvent.getCreatedData().keySet());
+ createdIids.addAll(asyncDataChangeEvent.getCreatedData().keySet());
+ }
+ if ((mask & BIT_UPDATE) == BIT_UPDATE) {
+ LOG.info("{} DataChanged: updated {}", type, asyncDataChangeEvent.getUpdatedData().keySet());
+ updatedIids.addAll(asyncDataChangeEvent.getUpdatedData().keySet());
+ }
+ if ((mask & BIT_DELETE) == BIT_DELETE) {
+ LOG.info("{} DataChanged: removed {}", type, asyncDataChangeEvent.getRemovedPaths());
+ removedIids.addAll(asyncDataChangeEvent.getRemovedPaths());
+ }
+
synchronized (this) {
notifyAll();
}
public void clear() {
createdIids.clear();
- removedIids.clear();
updatedIids.clear();
+ removedIids.clear();
}
public void registerDataChangeListener(DataBroker dataBroker) {
downCmd[COMPOSE_FILE_IDX] = tmpDockerComposeFile.toString();
execCmd[COMPOSE_FILE_IDX] = tmpDockerComposeFile.toString();
- if (0 == ProcUtils.tryProcess(5000, psCmdNoSudo)) {
+ if (0 == ProcUtils.tryProcess(null, 5000, psCmdNoSudo)) {
LOG.info("DockerOvs.buildDockerComposeCommands docker-compose does not require sudo");
String[] tmp;
tmp = Arrays.copyOfRange(upCmd, 1, upCmd.length);
downCmd = tmp;
tmp = Arrays.copyOfRange(execCmd, 1, execCmd.length);
execCmd = tmp;
- } else if (0 == ProcUtils.tryProcess(5000, psCmd)) {
+ } else if (0 == ProcUtils.tryProcess(null, 5000, psCmd)) {
LOG.info("DockerOvs.buildDockerComposeCommands docker-compose requires sudo");
} else {
Assert.fail("docker-compose does not seem to work with or without sudo");
ProcUtils.runProcess(waitFor, cmd);
}
- public void tryInContainer(int waitFor, int numOvs, String ... cmdWords) throws IOException, InterruptedException {
+ public void tryInContainer(String logText, int waitFor, int numOvs, String ... cmdWords) throws IOException, InterruptedException {
String[] pfx = getExecCmdPrefix(numOvs);
String[] cmd = new String[pfx.length + cmdWords.length];
System.arraycopy(pfx, 0, cmd, 0, pfx.length);
System.arraycopy(cmdWords, 0, cmd, pfx.length, cmdWords.length);
- ProcUtils.tryProcess(waitFor, cmd);
+ ProcUtils.tryProcess(logText, waitFor, cmd);
}
/**
* @throws IOException If something goes wrong with reading the process output
* @throws InterruptedException because there's some sleeping in here
*/
- public void logState(int dockerInstance) throws IOException, InterruptedException {
- tryInContainer(5000, dockerInstance, "ip", "addr");
- tryInContainer(5000, dockerInstance, "ovs-vsctl", "show");
- tryInContainer(5000, dockerInstance, "ovs-ofctl", "-OOpenFlow13", "show", "br-int");
- tryInContainer(5000, dockerInstance, "ovs-ofctl", "-OOpenFlow13", "dump-flows", "br-int");
- tryInContainer(5000, dockerInstance, "ip", "netns", "list");
+ public void logState(int dockerInstance, String logText) throws IOException, InterruptedException {
+ tryInContainer(logText, 5000, dockerInstance, "ip", "addr");
+ tryInContainer(logText, 5000, dockerInstance, "ovs-vsctl", "show");
+ tryInContainer(logText, 5000, dockerInstance, "ovs-ofctl", "-OOpenFlow13", "show", "br-int");
+ tryInContainer(logText, 5000, dockerInstance, "ovs-ofctl", "-OOpenFlow13", "dump-flows", "br-int");
+ tryInContainer(logText, 5000, dockerInstance, "ip", "netns", "list");
}
}
bridgeIid = SouthboundUtils.createInstanceIdentifier(connectionInfo, INTEGRATION_BRIDGE_NAME);
}
+ private void addWaiters() {
+ ovsdbWaiter = new NotifyingDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ NotifyingDataChangeListener.BIT_CREATE, ovsdbIid, waitList);
+ ovsdbWaiter.registerDataChangeListener(itUtils.dataBroker);
+ bridgeWaiter = new NotifyingDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+ NotifyingDataChangeListener.BIT_CREATE, bridgeIid, waitList);
+ bridgeWaiter.registerDataChangeListener(itUtils.dataBroker);
+ }
+
+ private void closeWaiters() throws Exception {
+ ovsdbWaiter.close();
+ bridgeWaiter.close();
+ }
+
/**
* Connect to the OVSDB node, wait for the connection to be established and for the integration bridge
* to be successfully created. Contains assertions for unexpected states
* @throws InterruptedException if interrupted while waiting for connection
*/
- public void connect() throws InterruptedException {
- ovsdbWaiter = new NotifyingDataChangeListener(LogicalDatastoreType.OPERATIONAL, ovsdbIid, waitList);
- ovsdbWaiter.registerDataChangeListener(itUtils.dataBroker);
- bridgeWaiter = new NotifyingDataChangeListener(LogicalDatastoreType.OPERATIONAL, bridgeIid, waitList);
- bridgeWaiter.registerDataChangeListener(itUtils.dataBroker);
+ public void connect() throws Exception {
+ addWaiters();
assertNotNull("connection failed", itUtils.southboundUtils.addOvsdbNode(connectionInfo, 0));
* Remove integration bridge and teardown connection. Contains assertions for unexpected states.
* @throws InterruptedException if interrupted while waiting for disconnect to complete
*/
- public void disconnect() throws InterruptedException {
+ public void disconnect() throws Exception {
+ ovsdbWaiter.setMask(NotifyingDataChangeListener.BIT_DELETE);
+ bridgeWaiter.setMask(NotifyingDataChangeListener.BIT_DELETE);
assertTrue(itUtils.southboundUtils.deleteBridge(connectionInfo, INTEGRATION_BRIDGE_NAME, 0));
bridgeWaiter.waitForDeletion();
Node bridgeNode = itUtils.mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, bridgeIid);
ovsdbWaiter.waitForDeletion();
Node ovsdbNode = itUtils.mdsalUtils.read(LogicalDatastoreType.OPERATIONAL, ovsdbIid);
assertNull("Ovsdb node should not be found", ovsdbNode);
+ closeWaiters();
}
}
* @throws IOException if something goes wrong on the IO end
* @throws InterruptedException If this thread is interrupted
*/
- public static void runProcess(long waitFor,StringBuilder capturedStdout, String... words)
+ public static void runProcess(long waitFor, StringBuilder capturedStdout, String... words)
throws IOException, InterruptedException {
- int exitValue = tryProcess(waitFor, capturedStdout, words);
+ int exitValue = tryProcess(null, waitFor, capturedStdout, words);
Assert.assertEquals("ProcUtils.runProcess exit code is not 0", 0, exitValue);
}
* @throws IOException if something goes wrong on the IO end
* @throws InterruptedException If this thread is interrupted
*/
- public static int tryProcess(long waitFor, String... words) throws IOException, InterruptedException {
- return tryProcess(waitFor, null, words);
+ public static int tryProcess(String logText, long waitFor, String... words)
+ throws IOException, InterruptedException {
+ return tryProcess(logText, waitFor, null, words);
}
/**
* @throws IOException if something goes wrong on the IO end
* @throws InterruptedException If this thread is interrupted
*/
- public static int tryProcess(long waitFor, StringBuilder capturedStdout, String... words)
+ public static int tryProcess(String logText, long waitFor, StringBuilder capturedStdout, String... words)
throws IOException, InterruptedException {
- LOG.info("ProcUtils.runProcess running \"{}\", waitFor {}", words, waitFor);
+ LOG.info("ProcUtils.runProcess {} running \"{}\", waitFor {}",
+ logText != null ? logText : "", words, waitFor);
Process proc = new ProcessBuilder(words).start();
int exitValue = -1;
exitValue = waitForExitValue(waitFor, proc);
while (stderr.ready()) {
- LOG.warn("ProcUtils.runProcess [stderr]: {}", stderr.readLine());
+ LOG.warn("ProcUtils.runProcess {} [stderr]: {}",
+ logText != null ? logText : "", stderr.readLine());
}
StringBuilder stdoutStringBuilder = (capturedStdout != null) ? capturedStdout : new StringBuilder();
stdoutStringBuilder.append(buf, 0, read);
}
- LOG.info("ProcUtils.runProcess [stdout]:\n{}", stdoutStringBuilder.toString());
+ LOG.info("ProcUtils.runProcess {} [stdout]:\n{}",
+ logText != null ? logText : "", stdoutStringBuilder.toString());
}
return exitValue;