package org.opendaylight.openflowplugin.applications.frsync;
import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
* @param flowcapableNodePath path to openflow augmentation of node
* @param configTree configured node
* @param operationalTree device reflection
+ * @param dsType type of DS change
* @return synchronization outcome
*/
ListenableFuture<Boolean> syncup(InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- FlowCapableNode configTree, FlowCapableNode operationalTree) throws InterruptedException;
+ FlowCapableNode configTree, FlowCapableNode operationalTree,
+ LogicalDatastoreType dsType) throws InterruptedException;
}
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
-
public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
final DataBroker dataBroker,
final RpcConsumerRegistry rpcRegistry) {
.setFlatBatchService(flatBatchService)
.setTableForwarder(tableForwarder);
- final SyncReactorImpl syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
- final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorImpl,
+ final RetryRegistry retryRegistry = new RetryRegistry();
+
+ final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
+ final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
+ final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
- final SyncReactor cfgReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
- final SyncReactor operReactor = new SyncReactorFutureWithCompressionDecorator(syncReactorGuard, syncThreadPool);
+ final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
- final NodeListener<FlowCapableNode> nodeListenerConfig = new SimplifiedConfigListener(cfgReactor, configSnapshot, operationalDao);
- final NodeListener<Node> nodeListenerOperational = new SimplifiedOperationalListener(operReactor, operationalSnapshot, configDao);
+ final NodeListener<FlowCapableNode> nodeListenerConfig =
+ new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
+ final NodeListener<Node> nodeListenerOperational =
+ new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
try {
SimpleTaskRetryLooper looper1 = new SimpleTaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
final Optional<FlowCapableNode> operationalNode = operationalDao.loadByNodeId(nodeId);
if (!operationalNode.isPresent()) {
LOG.info("Skip syncup, {} operational is not present", nodeId.getValue());
- return Optional.absent();// we try to reconfigure switch is alive
+ return Optional.absent();
}
final DataObjectModification<FlowCapableNode> configModification = modification.getRootNode();
FlowCapableNode dataBefore, FlowCapableNode dataAfter, FlowCapableNode operationalNode)
throws InterruptedException {
LOG.trace("onNodeAdded {}", nodePath);
-
- final ListenableFuture<Boolean> endResult =
- reactor.syncup(nodePath, dataAfter, operationalNode);
+ final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
return endResult;
}
FlowCapableNode dataBefore, FlowCapableNode dataAfter, FlowCapableNode operationalNodeNode)
throws InterruptedException {
LOG.trace("onNodeUpdated {}", nodePath);
-
- final ListenableFuture<Boolean> endResult =
- reactor.syncup(nodePath, dataAfter, dataBefore);
+ final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
return endResult;
}
protected ListenableFuture<Boolean> onNodeDeleted(InstanceIdentifier<FlowCapableNode> nodePath,
FlowCapableNode dataBefore, FlowCapableNode operationalNode) throws InterruptedException {
LOG.trace("onNodeDeleted {}", nodePath);
-
- final ListenableFuture<Boolean> endResult =
- reactor.syncup(nodePath, null, dataBefore);
+ final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, null, dataBefore, dsType());
return endResult;
}
public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
- private final SyncReactor reactor;
- private FlowCapableNodeSnapshotDao operationalSnapshot;
- private FlowCapableNodeDao configDao;
+ protected final SyncReactor reactor;
+ protected final FlowCapableNodeSnapshotDao operationalSnapshot;
+ protected final FlowCapableNodeDao configDao;
- public SimplifiedOperationalListener(SyncReactor reactor,
- FlowCapableNodeSnapshotDao operationalSnapshot, FlowCapableNodeDao configDao) {
+ public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
+ FlowCapableNodeDao configDao) {
this.reactor = reactor;
this.operationalSnapshot = operationalSnapshot;
this.configDao = configDao;
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
DataTreeModification<Node> modification) throws ReadFailedException, InterruptedException {
- updateCache(modification);
- if (isAdd(modification) || isAddLogical(modification)) {
+ updateCache(modification);
+ if (isReconciliationNeeded(modification)) {
return reconciliation(modification);
}
- // TODO: else = explicit reconciliation required
-
return skipModification(modification);
}
* Remove if delete. Update only if FlowCapableNode Augmentation modified.
*
* @param modification Datastore modification
+ * @return true for cache update, false for cache remove
*/
- protected void updateCache(DataTreeModification<Node> modification) {
- try {
- boolean isDelete = isDelete(modification) || isDeleteLogical(modification);
- if (isDelete) {
- operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
- return;
- }
-
- operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
- } catch(Exception e) {
- LOG.error("update cache failed {}", nodeId(modification), e);
+ protected boolean updateCache(DataTreeModification<Node> modification) {
+ if (isDelete(modification) || isDeleteLogical(modification)) {
+ operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
+ return false;
}
+ operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
+ return true;
}
- protected Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
+ private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}", nodeIdValue(modification),
modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
- return Optional.absent();// skip otherwise event
+ return Optional.absent();
}
/**
* ModificationType.DELETE
*/
- protected boolean isDelete(DataTreeModification<Node> modification) {
+ private boolean isDelete(DataTreeModification<Node> modification) {
if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
LOG.trace("Delete {} (physical)", nodeIdValue(modification));
return true;
/**
* All connectors disappeared from operational store (logical delete).
*/
- protected boolean isDeleteLogical(DataTreeModification<Node> modification) {
+ private boolean isDeleteLogical(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
LOG.trace("Delete {} (logical)", nodeIdValue(modification));
return false;
}
- protected boolean isAdd(DataTreeModification<Node> modification) {
+ private boolean isAdd(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
final Node dataAfter = rootNode.getDataAfter();
final Node dataBefore = rootNode.getDataBefore();
/**
* All connectors appeared in operational store (logical add).
*/
- protected boolean isAddLogical(DataTreeModification<Node> modification) {
+ private boolean isAddLogical(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
LOG.trace("Add {} (logical)", nodeIdValue(modification));
return false;
}
- protected Optional<ListenableFuture<Boolean>> reconciliation(
- DataTreeModification<Node> modification) throws InterruptedException {
- final NodeId nodeId = nodeId(modification);
-
- LOG.debug("Reconciliation: {}", nodeId.getValue());
+ protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
+ return isAdd(modification) || isAddLogical(modification);
+ }
+ private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
+ final NodeId nodeId = nodeId(modification);
final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
- final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
- if (nodeConfiguration.isPresent())
- return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification)));
- else
+ if (nodeConfiguration.isPresent()) {
+ LOG.debug("Reconciliation: {}", nodeId.getValue());
+ final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
+ return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification), dsType()));
+ } else {
return skipModification(modification);
+ }
}
static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
final Node dataAfter = rootNode.getDataAfter();
-
if (dataAfter != null) {
return dataAfter.getId();
}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Modified {@link SimplifiedOperationalListener} for usage of retry mechanism.
+ */
+public class SimplifiedOperationalRetryListener extends SimplifiedOperationalListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalRetryListener.class);
+ private final RetryRegistry retryRegistry;
+
+ public SimplifiedOperationalRetryListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
+ FlowCapableNodeDao configDao, RetryRegistry retryRegistry) {
+ super(reactor, operationalSnapshot, configDao);
+ this.retryRegistry = retryRegistry;
+ }
+
+ /**
+ * Adding condition check for retry.
+ *
+ * @param modification operational datastore modification
+ * @return true if reconciliation is needed, false otherwise
+ */
+ protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
+ return super.isReconciliationNeeded(modification) || isRegisteredAndConsistentForRetry(modification);
+ }
+
+ /**
+ * If node is removed unregister for retry in addition.
+ *
+ * @param modification operational datastore modification
+ * @return true for cache update, false for cache remove and retry unregister
+ */
+ protected boolean updateCache(DataTreeModification<Node> modification) {
+ boolean nodeUpdated = super.updateCache(modification);
+ if (!nodeUpdated) { // node removed if not updated
+ retryRegistry.unregisterIfRegistered(nodeId(modification));
+ }
+ return nodeUpdated;
+ }
+
+ /**
+ * Check if retry should be proceeded.
+ *
+ * @param modification operational modification
+ * @return true if device is registered for retry and actual modification is consistent, false otherwise
+ */
+ @VisibleForTesting
+ boolean isRegisteredAndConsistentForRetry(DataTreeModification<Node> modification) {
+ final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
+
+ if (!retryRegistry.isRegistered(nodeId)) {
+ return false;
+ }
+
+ final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
+ .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
+
+ if (gatheringStatus == null) {
+ LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
+ return false;
+ }
+
+ final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
+
+ if (gatheringStatusEnd == null) {
+ LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
+ return false;
+ }
+
+ if (!gatheringStatusEnd.isSucceeded()) {
+ LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
+ return false;
+ }
+
+ try {
+ Date timestampOfRegistration = retryRegistry.getRegistration(nodeId);;
+ final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
+ Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
+ if (timestampOfStatistics.after(timestampOfRegistration)) {
+ LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
+ return true;
+ }
+ } catch (ParseException e) {
+ LOG.error("Timestamp parsing error {}", e);
+ }
+ LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
+ return false;
+ }
+}
import java.util.concurrent.Callable;\r
import java.util.concurrent.TimeUnit;\r
import java.util.concurrent.TimeoutException;\r
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
}\r
\r
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
+ final LogicalDatastoreType dsType) throws InterruptedException {\r
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
LOG.trace("syncup {}", nodeId.getValue());\r
\r
final String oldThreadName = updateThreadName(nodeId);\r
\r
try {\r
- final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree)\r
+ final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)\r
.get(10000, TimeUnit.MILLISECONDS);\r
LOG.trace("ret {} {}", nodeId.getValue(), ret);\r
return true;\r
}\r
}\r
});\r
- \r
+\r
return syncup;\r
}\r
\r
protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
- throws InterruptedException {\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
+ final LogicalDatastoreType dsType) throws InterruptedException {\r
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
\r
- return delegate.syncup(flowcapableNodePath, configTree, operationalTree);\r
+ return delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);\r
}\r
\r
static String threadName() {\r
+++ /dev/null
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import com.google.common.util.concurrent.ListeningExecutorService;\r
-import java.util.HashMap;\r
-import java.util.Map;\r
-import java.util.concurrent.Semaphore;\r
-import javax.annotation.concurrent.GuardedBy;\r
-import org.apache.commons.lang3.tuple.Pair;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Enriches {@link SyncReactorFutureDecorator} with state compression.\r
- */\r
-public class SyncReactorFutureWithCompressionDecorator extends SyncReactorFutureDecorator {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureWithCompressionDecorator.class);\r
-\r
- @GuardedBy("compressionGuard")\r
- final Map<InstanceIdentifier<FlowCapableNode>, Pair<FlowCapableNode, FlowCapableNode>> compressionQueue =\r
- new HashMap<>();\r
- final Semaphore compressionGuard = new Semaphore(1, false);\r
-\r
- public SyncReactorFutureWithCompressionDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
- super(delegate, executorService);\r
- }\r
-\r
- public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("syncup {}", nodeId.getValue());\r
-\r
- try {\r
- compressionGuard.acquire();\r
-\r
- final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree);\r
- if (newFutureNecessary) {\r
- super.syncup(flowcapableNodePath, configTree, operationalTree);\r
- }\r
- return Futures.immediateFuture(true);\r
- } finally {\r
- compressionGuard.release();\r
- }\r
- }\r
-\r
- protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree)\r
- throws InterruptedException {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
-\r
- final Pair<FlowCapableNode, FlowCapableNode> lastCompressionState =\r
- removeLastCompressionState(flowcapableNodePath);\r
- if (lastCompressionState == null) {\r
- return Futures.immediateFuture(true);\r
- } else {\r
- return super.doSyncupInFuture(flowcapableNodePath,\r
- lastCompressionState.getLeft(), lastCompressionState.getRight());\r
- }\r
- }\r
-\r
- protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree) {\r
- final Pair<FlowCapableNode, FlowCapableNode> previous = compressionQueue.get(flowcapableNodePath);\r
- if (previous != null) {\r
- final FlowCapableNode previousOperational = previous.getRight();\r
- compressionQueue.put(flowcapableNodePath, Pair.of(configTree, previousOperational));\r
- return false;\r
- } else {\r
- compressionQueue.put(flowcapableNodePath, Pair.of(configTree, operationalTree));\r
- return true;\r
- }\r
- }\r
-\r
- protected Pair<FlowCapableNode/* config */, FlowCapableNode/* operational */> removeLastCompressionState(\r
- final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {\r
- try {\r
- try {\r
- compressionGuard.acquire();\r
- } catch (InterruptedException e) {\r
- return null;\r
- }\r
-\r
- return compressionQueue.remove(flowcapableNodePath);\r
- } finally {\r
- compressionGuard.release();\r
- }\r
- }\r
-}\r
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ZipQueueEntry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Enriches {@link SyncReactorFutureDecorator} with state compression.
+ */
+public class SyncReactorFutureZipDecorator extends SyncReactorFutureDecorator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
+
+ @GuardedBy("compressionGuard")
+ protected final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
+ protected final Semaphore compressionGuard = new Semaphore(1, false);
+
+ public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+ super(delegate, executorService);
+ }
+
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("syncup zip {}", nodeId.getValue());
+
+ try {
+ compressionGuard.acquire();
+
+ final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, configTree, operationalTree, dsType);
+ if (newFutureNecessary) {
+ super.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+ }
+ return Futures.immediateFuture(true);
+ } finally {
+ compressionGuard.release();
+ }
+ }
+
+ protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("doSyncupInFuture zip {}", nodeId.getValue());
+
+ final ZipQueueEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
+ if (lastCompressionState == null) {
+ return Futures.immediateFuture(true);
+ } else {
+ return super.doSyncupInFuture(flowcapableNodePath,
+ lastCompressionState.getLeft(), lastCompressionState.getRight(), dsType);
+ }
+ }
+
+ /**
+ * If there is config delta in compression queue for the device and new configuration is coming,
+ * update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
+ */
+ protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) {
+ final ZipQueueEntry previousEntry = compressionQueue.get(flowcapableNodePath);
+
+ if (previousEntry != null && dsType == LogicalDatastoreType.CONFIGURATION
+ && previousEntry.getDsType() == LogicalDatastoreType.CONFIGURATION) {
+ putOptimizedConfigDelta(flowcapableNodePath, configTree, previousEntry);
+ } else {
+ putLatestOperationalDelta(flowcapableNodePath, configTree, operationalTree, dsType);
+ }
+ return previousEntry == null;
+ }
+
+ private void putOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
+ ZipQueueEntry previous) {
+ compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, previous.getRight(), previous.getDsType()));
+ }
+
+ private void putLatestOperationalDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, FlowCapableNode configTree,
+ FlowCapableNode operationalTree, LogicalDatastoreType dsType) {
+ compressionQueue.put(flowcapableNodePath, new ZipQueueEntry(configTree, operationalTree, dsType));
+ }
+
+ private ZipQueueEntry removeLastCompressionState(
+ final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+ try {
+ try {
+ compressionGuard.acquire();
+ } catch (InterruptedException e) {
+ return null;
+ }
+
+ return compressionQueue.remove(flowcapableNodePath);
+ } finally {
+ compressionGuard.release();
+ }
+ }
+
+ @VisibleForTesting
+ Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> getCompressionQueue() {
+ return compressionQueue;
+ }
+}
\ No newline at end of file
import java.util.concurrent.Semaphore;\r
import java.util.concurrent.TimeUnit;\r
import javax.annotation.Nullable;\r
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
}\r
\r
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree) throws InterruptedException {\r
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
+ final LogicalDatastoreType dsType) throws InterruptedException {\r
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
LOG.trace("syncup {}", nodeId.getValue());\r
\r
formatNanos(stampAfterGuard - stampBeforeGuard),\r
guard, threadName());\r
}\r
- \r
+\r
final ListenableFuture<Boolean> endResult =\r
- delegate.syncup(flowcapableNodePath, configTree, operationalTree);//TODO handle InteruptedException\r
- \r
+ delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
+\r
Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
@Override\r
public void onSuccess(@Nullable final Boolean result) {\r
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
@Override
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> nodeIdent,
- final FlowCapableNode configTree, final FlowCapableNode operationalTree) {
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) {
LOG.trace("syncup {} cfg:{} oper:{}", nodeIdent, configTree == null ? "is null" : "non null", operationalTree == null ? "is null" : "non null");
final SyncCrudCounters counters = new SyncCrudCounters();
);
}
+ LOG.trace("syncup errors: {}", input.getErrors());
return input.isSuccessful();
}
});
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adding retry mechanism in case of unsuccessful syncup.
+ */
+public class SyncReactorRetryDecorator implements SyncReactor{
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorRetryDecorator.class);
+
+ private final SyncReactor delegate;
+ private final RetryRegistry retryRegistry;
+
+ public SyncReactorRetryDecorator (final SyncReactor delegate, RetryRegistry retryRegistry) {
+ this.delegate = delegate;
+ this.retryRegistry = retryRegistry;
+ }
+
+ public ListenableFuture<Boolean> syncup (final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("syncup retry {}", nodeId.getValue());
+
+ if (dsType == LogicalDatastoreType.CONFIGURATION && retryRegistry.isRegistered(nodeId)) {
+ LOG.trace("Config change ignored because device is in retry [{}]", nodeId);
+ return Futures.immediateFuture(Boolean.FALSE);
+ }
+
+ ListenableFuture<Boolean> syncupResult = delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+
+ return Futures.transform(syncupResult, new Function<Boolean, Boolean>() {
+ @Override
+ public Boolean apply(Boolean result) {
+ LOG.trace("syncup ret in retry {}", result);
+ if (result) {
+ retryRegistry.unregisterIfRegistered(nodeId);
+ return true;
+ } else {
+ retryRegistry.register(nodeId);
+ // TODO elicit statistics gathering if not running actually
+ // triggerStatisticsGathering(nodeId);
+ return false;
+ }
+ }
+ });
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holder of registration request for fresh operational.
+ */
+public class RetryRegistry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryRegistry.class);
+ private final Map<NodeId, Date> registration = new ConcurrentHashMap<>();
+ public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
+
+ public Date register(NodeId nodeId) {
+ Date timestamp = new Date();
+ registration.put(nodeId, timestamp);
+ if (timestamp != null) {
+ LOG.debug("Registered for next consistent operational: {}", nodeId.getValue());
+ }
+ return timestamp;
+ }
+
+ public Date unregisterIfRegistered(NodeId nodeId) {
+ Date timestamp = registration.remove(nodeId);
+ if (timestamp != null) {
+ LOG.debug("Unregistered for next consistent operational: {}", nodeId.getValue());
+ }
+ return timestamp;
+ }
+
+ public boolean isRegistered(NodeId nodeId) {
+ return registration.get(nodeId) != null;
+ }
+
+ public Date getRegistration(NodeId nodeId) {
+ return registration.get(nodeId);
+ }
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+
+/**
+ * Simple compression queue entry for {@link org.opendaylight.openflowplugin.applications.frsync.impl.SyncReactorFutureZipDecorator}.
+ */
+public class ZipQueueEntry {
+ private FlowCapableNode after;
+ private FlowCapableNode before;
+ private LogicalDatastoreType dsTypeBefore;
+
+ public ZipQueueEntry(final FlowCapableNode after, final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) {
+ this.after = after;
+ this.before = before;
+ this.dsTypeBefore = dsTypeBefore;
+
+ }
+
+ public FlowCapableNode getLeft() {
+ return after;
+ }
+
+ public FlowCapableNode getRight() {
+ return before;
+ }
+
+ public LogicalDatastoreType getDsType() {
+ return dsTypeBefore;
+ }
+
+}
@RunWith(MockitoJUnitRunner.class)
public class SimplifiedConfigListenerTest {
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private InstanceIdentifier<FlowCapableNode> fcNodePath;
+ private SimplifiedConfigListener nodeListenerConfig;
+ private LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
@Mock
private SyncReactor reactor;
@Mock
- private DataBroker db;
+ private ReadOnlyTransaction roTx;
@Mock
private DataTreeModification<FlowCapableNode> dataTreeModification;
@Mock
- private ReadOnlyTransaction roTx;
- @Mock
private DataObjectModification<FlowCapableNode> configModification;
-
- private InstanceIdentifier<FlowCapableNode> nodePath;
- private SimplifiedConfigListener nodeListenerConfig;
+ @Mock
+ private FlowCapableNode dataBefore;
+ @Mock
+ private FlowCapableNode dataAfter;
@Before
public void setUp() throws Exception {
- final FlowCapableNodeSnapshotDao configSnaphot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeSnapshotDao operationalSnaphot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnaphot,
+ final DataBroker db = Mockito.mock(DataBroker.class);
+ final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
new FlowCapableNodeOdlDao(db, LogicalDatastoreType.OPERATIONAL));
-
- nodeListenerConfig = new SimplifiedConfigListener(reactor, configSnaphot, operationalDao);
- nodePath = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(new NodeId("testNode")))
+ nodeListenerConfig = new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
+ fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
.augmentation(FlowCapableNode.class);
+
+ final DataTreeIdentifier<FlowCapableNode> dataTreeIdentifier =
+ new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, fcNodePath);
+
+ Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
+ Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
+ Mockito.when(dataTreeModification.getRootNode()).thenReturn(configModification);
+ Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
}
@Test
}
@Test
- public void testOnDataTreeChanged() throws Exception {
- final FlowCapableNode configTree = Mockito.mock(FlowCapableNode.class);
- final FlowCapableNode operationalTree = Mockito.mock(FlowCapableNode.class);
- final DataTreeIdentifier<FlowCapableNode> dataTreeIdentifier =
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, nodePath);
+ public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
+ Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
+ .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+ Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
- Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
- Mockito.when(dataTreeModification.getRootNode()).thenReturn(configModification);
- Mockito.when(configModification.getDataAfter()).thenReturn(configTree);
- Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
- Mockito.doReturn(Futures.immediateCheckedFuture(Optional.of(operationalTree))).when(
- roTx).read(LogicalDatastoreType.OPERATIONAL, nodePath);
- Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),Matchers.<FlowCapableNode>any()))
- .thenReturn(Futures.immediateFuture(Boolean.TRUE));
+ nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(reactor).syncup(fcNodePath, dataAfter, dataBefore, dsType);
+ Mockito.verifyNoMoreInteractions(reactor);
+ Mockito.verify(roTx).close();
+ }
+
+ @Test
+ public void testOnDataTreeChangedSyncupUpdate() throws InterruptedException {
+ Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
+ .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+ Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
+ Mockito.when(configModification.getDataAfter()).thenReturn(dataAfter);
+
+ nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(reactor).syncup(fcNodePath, dataAfter, dataBefore, dsType);
+ Mockito.verifyNoMoreInteractions(reactor);
+ Mockito.verify(roTx).close();
+ }
+
+ @Test
+ public void testOnDataTreeChangedSyncupDelete() throws InterruptedException {
+ Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath))
+ .thenReturn(Futures.immediateCheckedFuture(Optional.of(dataBefore)));
+ Mockito.when(configModification.getDataBefore()).thenReturn(dataBefore);
+
+ nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(reactor).syncup(fcNodePath, null, dataBefore, dsType);
+ Mockito.verifyNoMoreInteractions(reactor);
+ Mockito.verify(roTx).close();
+ }
+
+ @Test
+ public void testOnDataTreeChangedSkip() {
+ Mockito.when(roTx.read(LogicalDatastoreType.OPERATIONAL, fcNodePath)).
+ thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
nodeListenerConfig.onDataTreeChanged(Collections.singleton(dataTreeModification));
- Mockito.verify(reactor).syncup(nodePath, configTree, operationalTree);
+ Mockito.verifyZeroInteractions(reactor);
Mockito.verify(roTx).close();
}
}
\ No newline at end of file
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import java.util.Collections;
+import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@RunWith(MockitoJUnitRunner.class)
public class SimplifiedOperationalListenerTest {
- public static final NodeId NODE_ID = new NodeId("testNode");
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private InstanceIdentifier<FlowCapableNode> fcNodePath;
+ private SimplifiedOperationalListener nodeListenerOperational;
+ private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
+
@Mock
private SyncReactor reactor;
@Mock
@Mock
private DataTreeModification<Node> dataTreeModification;
@Mock
+ private DataObjectModification<Node> operationalModification;
+ @Mock
private FlowCapableNode configNode;
@Mock
+ private Node operationalNode;
+ @Mock
private FlowCapableNode fcOperationalNode;
- private InstanceIdentifier<Node> nodePath;
- private InstanceIdentifier<FlowCapableNode> fcNodePath;
- private SimplifiedOperationalListener nodeListenerOperational;
-
@Before
public void setUp() throws Exception {
final DataBroker db = Mockito.mock(DataBroker.class);
- final DataObjectModification<Node> operationalModification = Mockito.mock(DataObjectModification.class);
- final Node operationalNode = Mockito.mock(Node.class);
-
- final FlowCapableNodeSnapshotDao configSnaphot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeSnapshotDao operationalSnaphot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnaphot,
+ final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
- nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnaphot, configDao);
- nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+
+ nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao);
+ InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
fcNodePath = nodePath.augmentation(FlowCapableNode.class);
final DataTreeIdentifier<Node> dataTreeIdentifier =
new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, nodePath);
- Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
- Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
+ Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
Mockito.when(dataTreeModification.getRootNode()).thenReturn(operationalModification);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
}
@Test
}
@Test
- public void testOnDataTreeChangedSyncup() throws Exception {
- Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),
- Matchers.<FlowCapableNode>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+ public void testOnDataTreeChangedSyncupAdd() throws InterruptedException {
Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
+ Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
- Mockito.verify(reactor, Mockito.times(1)).syncup(fcNodePath, configNode, fcOperationalNode);
+ Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(reactor);
Mockito.verify(roTx).close();
}
@Test
- public void testOnDataTreeChangedSkip() throws Exception {
+ public void testOnDataTreeChangedAddSkip() {
// Related to bug 5920 -> https://bugs.opendaylight.org/show_bug.cgi?id=5920
Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
.thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
Mockito.verifyZeroInteractions(reactor);
Mockito.verify(roTx).close();
}
+
+ @Test
+ public void testOnDataTreeChangedSyncupDeletePhysical() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(ModificationType.DELETE);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedSyncupDeleteLogical() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ List<NodeConnector> nodeConnectorList = Mockito.mock(List.class);
+ Mockito.when(operationalNode.getNodeConnector()).thenReturn(nodeConnectorList);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
}
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
+import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SimplifiedOperationalRetryListener}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SimplifiedOperationalRetryListenerTest {
+
+ private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private InstanceIdentifier<FlowCapableNode> fcNodePath;
+ private SimplifiedOperationalRetryListener nodeListenerOperational;
+ private final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
+ private final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
+ private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
+
+ @Mock
+ private SyncReactor reactor;
+ @Mock
+ private ReadOnlyTransaction roTx;
+ @Mock
+ private DataTreeModification<Node> dataTreeModification;
+ @Mock
+ private DataObjectModification<Node> operationalModification;
+ @Mock
+ private FlowCapableNode configNode;
+ @Mock
+ private Node operationalNode;
+ @Mock
+ private FlowCapableNode fcOperationalNode;
+ @Mock
+ private RetryRegistry retryRegistry;
+ @Mock
+ private FlowCapableStatisticsGatheringStatus statisticsGatheringStatus;
+ @Mock
+ private SnapshotGatheringStatusEnd snapshotGatheringStatusEnd;
+
+ @Before
+ public void setUp() throws Exception {
+ final DataBroker db = Mockito.mock(DataBroker.class);
+ final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
+ final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
+ new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
+
+ nodeListenerOperational = new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
+ InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+ fcNodePath = nodePath.augmentation(FlowCapableNode.class);
+
+ final DataTreeIdentifier<Node> dataTreeIdentifier =
+ new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, nodePath);
+
+ Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
+ Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
+ Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
+ Mockito.when(dataTreeModification.getRootNode()).thenReturn(operationalModification);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryNotRegistered() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(false);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryButStaticsGatheringNotStarted() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(null);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryButStaticsGatheringNotFinished() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(null);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryButStaticsGatheringNotSuccessful() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(false);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryAndFreshOperationalNotPresent() throws ParseException {
+ final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
+ Mockito.when(retryRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryAndFreshOperationalPresent() throws Exception {
+ final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+ Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
+ .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
+ Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
+ Mockito.when(retryRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(reactor);
+ Mockito.verify(roTx).close();
+ }
+
+ @Test
+ public void testOnDataTreeChangedRetryAndNodeDeleted() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(retryRegistry).unregisterIfRegistered(NODE_ID);
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link SyncReactorFutureZipDecorator}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncReactorFutureZipDecoratorTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecoratorTest.class);
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private SyncReactorFutureZipDecorator reactor;
+ private InstanceIdentifier<FlowCapableNode> fcNodePath;
+ private ListeningExecutorService syncThreadPool;
+
+ @Mock
+ private SyncReactorGuardDecorator delegate;
+
+ @Before
+ public void setUp() {
+ syncThreadPool = FrmExecutors.instance()
+ .newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+ .setDaemon(false)
+ .build());
+
+ reactor = new SyncReactorFutureZipDecorator(delegate, syncThreadPool);
+ fcNodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID))
+ .augmentation(FlowCapableNode.class);
+ }
+
+ @Test
+ public void testSyncupWithOptimizedConfigDeltaCompression() throws Exception {
+ final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
+ final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
+ final FlowCapableNode dataAfter2 = Mockito.mock(FlowCapableNode.class);
+ final CountDownLatch latchForFirst = new CountDownLatch(1);
+ final CountDownLatch latchForNext = new CountDownLatch(1);
+ final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
+ @Override
+ public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+ LOG.info("unlocking next configs");
+ latchForNext.countDown();
+ latchForFirst.await();
+ LOG.info("unlocking first delegate");
+ return Futures.immediateFuture(Boolean.TRUE);
+ }
+ }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+ final List<ListenableFuture<Boolean>> allResults = new ArrayList<>();
+ allResults.add(reactor.syncup(fcNodePath, dataBefore, null, dsType));
+ latchForNext.await();
+
+ allResults.add(reactor.syncup(fcNodePath, dataAfter, dataBefore, dsType));
+ allResults.add(reactor.syncup(fcNodePath, null, dataAfter, dsType));
+ allResults.add(reactor.syncup(fcNodePath, dataAfter2, null, dsType));
+ latchForFirst.countDown();
+
+ Futures.allAsList(allResults).get(1, TimeUnit.SECONDS);
+ LOG.info("all configs done");
+
+ syncThreadPool.shutdown();
+ boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
+ if (!terminated) {
+ LOG.info("thread pool not terminated.");
+ syncThreadPool.shutdownNow();
+ }
+ final InOrder inOrder = Mockito.inOrder(delegate);
+ inOrder.verify(delegate).syncup(fcNodePath, dataBefore, null, dsType);
+ inOrder.verify(delegate).syncup(fcNodePath, dataAfter2, dataBefore, dsType);
+ inOrder.verifyNoMoreInteractions();
+ }
+
+ @Test
+ public void testSyncupConfigEmptyQueue() throws Exception {
+ final FlowCapableNode dataBefore = Mockito.mock(FlowCapableNode.class);
+ final FlowCapableNode dataAfter = Mockito.mock(FlowCapableNode.class);
+ final CountDownLatch latchForNext = new CountDownLatch(1);
+ final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
+ @Override
+ public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+ LOG.info("unlocking next config");
+ latchForNext.countDown();
+ return Futures.immediateFuture(Boolean.TRUE);
+ }
+ }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+ reactor.syncup(fcNodePath, dataBefore, null, dsType);
+ latchForNext.await();
+ reactor.syncup(fcNodePath, dataAfter, dataBefore, dsType);
+
+ boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
+ if (!terminated) {
+ LOG.info("thread pool not terminated.");
+ syncThreadPool.shutdownNow();
+ }
+ final InOrder inOrder = Mockito.inOrder(delegate);
+ inOrder.verify(delegate).syncup(fcNodePath, dataBefore, null, dsType);
+ inOrder.verify(delegate).syncup(fcNodePath, dataAfter, dataBefore, dsType);
+ inOrder.verifyNoMoreInteractions();
+
+ }
+
+ @Test
+ public void testSyncupRewriteZipEntryWithOperationalDelta() throws Exception {
+ final FlowCapableNode configBefore = Mockito.mock(FlowCapableNode.class);
+ final FlowCapableNode configAfter = Mockito.mock(FlowCapableNode.class);
+ final FlowCapableNode configActual = Mockito.mock(FlowCapableNode.class);
+ final FlowCapableNode freshOperational = Mockito.mock(FlowCapableNode.class);
+ final CountDownLatch latchForFirst = new CountDownLatch(1);
+ final CountDownLatch latchForNext = new CountDownLatch(1);
+
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenAnswer(new Answer<ListenableFuture<Boolean>>() {
+ @Override
+ public ListenableFuture<Boolean> answer(final InvocationOnMock invocationOnMock) throws Throwable {
+ LOG.info("unlocking for fresh operational");
+ latchForNext.countDown();
+ latchForFirst.await();
+ LOG.info("unlocking first delegate");
+ return Futures.immediateFuture(Boolean.TRUE);
+ }
+ }).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+ reactor.syncup(fcNodePath, configAfter, configBefore, LogicalDatastoreType.CONFIGURATION);
+ latchForNext.await();
+
+ reactor.syncup(fcNodePath, configActual, freshOperational, LogicalDatastoreType.OPERATIONAL);
+ latchForFirst.countDown();
+
+ syncThreadPool.shutdown();
+ boolean terminated = syncThreadPool.awaitTermination(1, TimeUnit.SECONDS);
+ if (!terminated) {
+ LOG.info("thread pool not terminated.");
+ syncThreadPool.shutdownNow();
+ }
+ Mockito.verify(delegate, Mockito.times(1)).syncup(fcNodePath, configActual, freshOperational, LogicalDatastoreType.OPERATIONAL);
+ }
+
+ @After
+ public void tearDown() {
+ syncThreadPool.shutdownNow();
+ }
+}
\ No newline at end of file
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
private static final NodeId NODE_ID = new NodeId("test-node");
private SyncReactorGuardDecorator reactor;
private InstanceIdentifier<FlowCapableNode> fcNodePath;
+ private final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
@Mock
- private SyncReactorImpl delegate;
+ private SyncReactorRetryDecorator delegate;
@Mock
private FlowCapableNode fcConfigNode;
@Mock
@Before
public void setUp() throws Exception {
- reactor = new SyncReactorGuardDecorator(delegate, new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+ final SemaphoreKeeperGuavaImpl semaphoreKeeper = new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true);
+ reactor = new SyncReactorGuardDecorator(delegate, semaphoreKeeper);
InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
fcNodePath = nodePath.augmentation(FlowCapableNode.class);
@Test
public void testSyncupSuccess() throws Exception {
- Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),
- Matchers.<FlowCapableNode>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
- reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode);
- Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode);
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+ Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(delegate);
}
@Test
public void testSyncupFail() throws Exception {
- Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(),Matchers.<FlowCapableNode>any(),
- Matchers.<FlowCapableNode>any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
- reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode);
- Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode);
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
+
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+ Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(delegate);
}
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
private static final NodeId NODE_ID = new NodeId("unit-nodeId");
private static final InstanceIdentifier<FlowCapableNode> NODE_IDENT = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(NODE_ID)).augmentation(FlowCapableNode.class);
-
private SyncReactorImpl reactor;
+
@Mock
private DataBroker db;
@Mock
private SyncPlanPushStrategy syncPlanPushStrategy;
-
@Captor
private ArgumentCaptor<Group> groupCaptor;
@Captor
Matchers.<SyncCrudCounters>any()))
.thenReturn(RpcResultBuilder.<Void>success().buildFuture());
- final ListenableFuture<Boolean> syncupResult = reactor.syncup(NODE_IDENT, configFcn, operationalFcn);
+ final ListenableFuture<Boolean> syncupResult = reactor.syncup(NODE_IDENT, configFcn, operationalFcn, LogicalDatastoreType.CONFIGURATION);
try {
Assert.assertTrue(syncupResult.isDone());
final Boolean voidRpcResult = syncupResult.get(2, TimeUnit.SECONDS);
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.Futures;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+/**
+ * Test for {@link SyncReactorRetryDecorator}
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SyncReactorRetryDecoratorTest {
+
+ private static final NodeId NODE_ID = new NodeId("test-node");
+ private SyncReactorRetryDecorator reactor;
+ private InstanceIdentifier<FlowCapableNode> fcNodePath;
+ final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+
+ @Mock
+ private SyncReactorImpl delegate;
+ @Mock
+ private RetryRegistry retryRegistry;
+ @Mock
+ private FlowCapableNode fcConfigNode;
+ @Mock
+ private FlowCapableNode fcOperationalNode;
+
+ @Before
+ public void setUp() {
+ reactor = new SyncReactorRetryDecorator(delegate, retryRegistry);
+ InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
+ fcNodePath = nodePath.augmentation(FlowCapableNode.class);
+
+ final Node operationalNode = Mockito.mock(Node.class);
+ Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
+ }
+
+ @Test
+ public void testSyncupSuccess() throws InterruptedException {
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+ Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(delegate);
+ Mockito.verify(retryRegistry).unregisterIfRegistered(NODE_ID);
+ }
+
+ @Test
+ public void testSyncupFail() throws InterruptedException {
+ Mockito.when(delegate.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.FALSE));
+
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+ Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(delegate);
+ Mockito.verify(retryRegistry).register(NODE_ID);
+ }
+
+ @Test
+ public void testSyncupConfigIgnoreInRetry() throws InterruptedException {
+ Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+
+ reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
+
+ Mockito.verifyZeroInteractions(delegate);
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import java.util.Date;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Test for {@link RetryRegistry}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RetryRegistryTest {
+
+ private static final NodeId NODE_ID = new NodeId("testNode");
+ private RetryRegistry retryRegistry;
+
+ @Before
+ public void setUp() throws Exception {
+ retryRegistry = new RetryRegistry();
+ }
+
+ @Test
+ public void testRegister() {
+ Date timestamp = retryRegistry.register(NODE_ID);
+ Assert.assertEquals(true, retryRegistry.isRegistered(NODE_ID));
+ Assert.assertNotNull(timestamp);
+ }
+
+ @Test
+ public void testUnregisterIfRegistered() {
+ retryRegistry.register(NODE_ID);
+ Date timestamp = retryRegistry.unregisterIfRegistered(NODE_ID);
+ Assert.assertEquals(false, retryRegistry.isRegistered(NODE_ID));
+ Assert.assertNotNull(timestamp);
+ }
+
+ @Test
+ public void testUnregisterIfNotRegistered() {
+ Date timestamp = retryRegistry.unregisterIfRegistered(NODE_ID);
+ Assert.assertEquals(false, retryRegistry.isRegistered(NODE_ID));
+ Assert.assertNull(timestamp);
+ }
+
+}