* @param identifier - the whole path to new DataObject
* @param add - new DataObject
* @param nodeIdent - Node InstanceIdentifier
+ * @return RpcResult of action
*/
Future<RpcResult<A>> add(InstanceIdentifier<D> identifier, D add, InstanceIdentifier<FlowCapableNode> nodeIdent);
* @param identifier - the whole path to DataObject
* @param del - DataObject for removing
* @param nodeIdent - Node InstanceIdentifier
+ * @return RpcResult of action
*/
Future<RpcResult<R>> remove(InstanceIdentifier<D> identifier, D del, InstanceIdentifier<FlowCapableNode> nodeIdent);
* @param original - original DataObject (for update)
* @param update - changed DataObject (contain updates)
* @param nodeIdent - Node InstanceIdentifier
+ * @return RpcResult of action
*/
Future<RpcResult<U>> update(InstanceIdentifier<D> identifier, D original, D update,
InstanceIdentifier<FlowCapableNode> nodeIdent);
* @param operationalTree device reflection
* @param dsType type of DS change
* @return synchronization outcome
+ * @throws InterruptedException
*/
ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final FlowCapableNode configTree, final FlowCapableNode operationalTree,
Preconditions.checkNotNull(tableKey, "TableKey can not be null or empty!");
Preconditions.checkNotNull(flow, "Flow can not be null or empty!");
if (!tableKey.getId().equals(flow.getTableId())) {
- LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.",
+ LOG.warn("TableID in URI tableId={} and in payload tableId={} is not same.",
flow.getTableId(), tableKey.getId());
return false;
}
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
.setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
.build());
syncThreadPool = MoreExecutors.listeningDecorator(executorService);
-
broker.registerProvider(this);
}
@Override
- public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
+ public void onSessionInitiated(final ProviderContext providerContext) {
final TableForwarder tableForwarder = new TableForwarder(salTableService);
final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
.setFlatBatchService(flatBatchService)
.setTableForwarder(tableForwarder);
- final RetryRegistry retryRegistry = new RetryRegistry();
+ final ReconciliationRegistry reconciliationRegistry = new ReconciliationRegistry();
final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
- final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
+ final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, reconciliationRegistry);
final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
- new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
+ new SemaphoreKeeperGuavaImpl<>(1, true));
final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
final NodeListener<FlowCapableNode> nodeListenerConfig =
new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
final NodeListener<Node> nodeListenerOperational =
- new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
+ new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry);
dataTreeConfigChangeListener =
dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
syncThreadPool.shutdown();
}
+
}
public class SimplifiedConfigListener extends AbstractFrmSyncListener<FlowCapableNode> {
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedConfigListener.class);
private final SyncReactor reactor;
- private final FlowCapableNodeSnapshotDao configSnaphot;
+ private final FlowCapableNodeSnapshotDao configSnapshot;
private final FlowCapableNodeDao operationalDao;
- public SimplifiedConfigListener(final SyncReactor reactor, FlowCapableNodeSnapshotDao configSnaphot,
- FlowCapableNodeDao operationalDao) {
+ public SimplifiedConfigListener(final SyncReactor reactor,
+ final FlowCapableNodeSnapshotDao configSnapshot,
+ final FlowCapableNodeDao operationalDao) {
this.reactor = reactor;
- this.configSnaphot = configSnaphot;
+ this.configSnapshot = configSnapshot;
this.operationalDao = operationalDao;
}
final InstanceIdentifier<FlowCapableNode> nodePath = modification.getRootPath().getRootIdentifier();
final NodeId nodeId = PathUtil.digNodeId(nodePath);
- configSnaphot.updateCache(nodeId, Optional.fromNullable(modification.getRootNode().getDataAfter()));
+ configSnapshot.updateCache(nodeId, Optional.fromNullable(modification.getRootNode().getDataAfter()));
final Optional<FlowCapableNode> operationalNode = operationalDao.loadByNodeId(nodeId);
FlowCapableNode dataAfter, FlowCapableNode operationalNode) throws InterruptedException {
NodeId nodeId = PathUtil.digNodeId(nodePath);
LOG.trace("onNodeAdded {}", nodeId);
- final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
- return endResult;
+ return reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
}
/**
FlowCapableNode dataBefore, FlowCapableNode dataAfter) throws InterruptedException {
NodeId nodeId = PathUtil.digNodeId(nodePath);
LOG.trace("onNodeUpdated {}", nodeId);
- final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
- return endResult;
+ return reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
}
/**
FlowCapableNode dataBefore) throws InterruptedException {
NodeId nodeId = PathUtil.digNodeId(nodePath);
LOG.trace("onNodeDeleted {}", nodeId);
- final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, null, dataBefore, dsType());
- return endResult;
+ return reactor.syncup(nodePath, null, dataBefore, dsType());
}
@Override
public LogicalDatastoreType dsType() {
return LogicalDatastoreType.CONFIGURATION;
}
+
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Collection;
+import java.util.Date;
import java.util.List;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.ModificationUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
private final SyncReactor reactor;
private final FlowCapableNodeSnapshotDao operationalSnapshot;
private final FlowCapableNodeDao configDao;
+ private final ReconciliationRegistry reconciliationRegistry;
- public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
- FlowCapableNodeDao configDao) {
+ public SimplifiedOperationalListener(final SyncReactor reactor,
+ final FlowCapableNodeSnapshotDao operationalSnapshot,
+ final FlowCapableNodeDao configDao,
+ final ReconciliationRegistry reconciliationRegistry) {
this.reactor = reactor;
this.operationalSnapshot = operationalSnapshot;
this.configDao = configDao;
+ this.reconciliationRegistry = reconciliationRegistry;
}
@Override
public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
+ // TODO return for clustered listener if not master for device
LOG.trace("Inventory Operational changes {}", modifications.size());
super.onDataTreeChanged(modifications);
}
DataTreeModification<Node> modification) throws InterruptedException {
updateCache(modification);
+ // TODO register cluster service if node added
if (isReconciliationNeeded(modification)) {
return reconciliation(modification);
}
* Remove if delete. Update only if FlowCapableNode Augmentation modified.
*
* @param modification Datastore modification
- * @return true for cache update, false for cache remove
*/
- protected boolean updateCache(DataTreeModification<Node> modification) {
+ private void updateCache(DataTreeModification<Node> modification) {
+ NodeId nodeId = ModificationUtil.nodeId(modification);
if (isDelete(modification) || isDeleteLogical(modification)) {
- operationalSnapshot.updateCache(nodeId(modification), Optional.<FlowCapableNode>absent());
- return false;
+ operationalSnapshot.updateCache(nodeId, Optional.absent());
+ // TODO unregister/close cluster service if node deleted
+ reconciliationRegistry.unregisterIfRegistered(nodeId);
+ return;
}
- operationalSnapshot.updateCache(nodeId(modification), Optional.fromNullable(flowCapableNodeAfter(modification)));
- return true;
+ operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
}
private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
- LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}", nodeIdValue(modification),
+ LOG.trace("Skipping Inventory Operational modification {}, before {}, after {}",
+ ModificationUtil.nodeIdValue(modification),
modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
return Optional.absent();
*/
private boolean isDelete(DataTreeModification<Node> modification) {
if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
- LOG.trace("Delete {} (physical)", nodeIdValue(modification));
+ LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
return true;
}
private boolean isDeleteLogical(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Delete {} (logical)", nodeIdValue(modification));
+ LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
return true;
}
final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
if (nodeAppearedInOperational) {
- LOG.trace("Add {} (physical)", nodeIdValue(modification));
+ LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
}
return nodeAppearedInOperational;
}
private boolean isAddLogical(DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Add {} (logical)", nodeIdValue(modification));
+ LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
return true;
}
return false;
}
- protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
- return isAdd(modification) || isAddLogical(modification);
+ private boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
+ return isAdd(modification) || isAddLogical(modification) || isRegisteredAndConsistentForReconcile(modification);
}
private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
- final NodeId nodeId = nodeId(modification);
+ final NodeId nodeId = ModificationUtil.nodeId(modification);
final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
if (nodeConfiguration.isPresent()) {
LOG.debug("Reconciliation: {}", nodeId.getValue());
final InstanceIdentifier<FlowCapableNode> nodePath = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(nodeId(modification))).augmentation(FlowCapableNode.class);
- return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), flowCapableNodeAfter(modification), dsType()));
+ .child(Node.class, new NodeKey(ModificationUtil.nodeId(modification)))
+ .augmentation(FlowCapableNode.class);
+ final FlowCapableNode fcNode = ModificationUtil.flowCapableNodeAfter(modification);
+ return Optional.of(reactor.syncup(nodePath, nodeConfiguration.get(), fcNode, dsType()));
} else {
return skipModification(modification);
}
}
- private static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
- final Node dataAfter = modification.getRootNode().getDataAfter();
- if (dataAfter == null) {
- return null;
- }
- return dataAfter.getAugmentation(FlowCapableNode.class);
- }
+ private boolean isRegisteredAndConsistentForReconcile(DataTreeModification<Node> modification) {
+ final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
- private static boolean safeConnectorsEmpty(Node node) {
- if (node == null) {
- return true;
+ if (!reconciliationRegistry.isRegistered(nodeId)) {
+ return false;
}
- final List<NodeConnector> nodeConnectors = node.getNodeConnector();
+ final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
+ .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
- return nodeConnectors == null || nodeConnectors.isEmpty();
- }
+ if (gatheringStatus == null) {
+ LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
+ return false;
+ }
- private static String nodeIdValue(DataTreeModification<Node> modification) {
- final NodeId nodeId = nodeId(modification);
+ final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
- if (nodeId == null) {
- return null;
+ if (gatheringStatusEnd == null) {
+ LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
+ return false;
}
- return nodeId.getValue();
- }
-
- static NodeId nodeId(DataTreeModification<Node> modification) {
- final DataObjectModification<Node> rootNode = modification.getRootNode();
- final Node dataAfter = rootNode.getDataAfter();
+ if (!gatheringStatusEnd.isSucceeded()) {
+ LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
+ return false;
+ }
- if (dataAfter != null) {
- return dataAfter.getId();
+ try {
+ Date timestampOfRegistration = reconciliationRegistry.getRegistration(nodeId);
+ final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
+ Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
+ if (timestampOfStatistics.after(timestampOfRegistration)) {
+ LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
+ return true;
+ }
+ } catch (ParseException e) {
+ LOG.error("Timestamp parsing error {}", e);
}
+ LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
+ return false;
+ }
- final Node dataBefore = rootNode.getDataBefore();
- if (dataBefore != null) {
- return dataBefore.getId();
+ private static boolean safeConnectorsEmpty(Node node) {
+ if (node == null) {
+ return true;
}
- return null;
+ final List<NodeConnector> nodeConnectors = node.getNodeConnector();
+
+ return nodeConnectors == null || nodeConnectors.isEmpty();
}
@Override
public LogicalDatastoreType dsType() {
return LogicalDatastoreType.OPERATIONAL;
}
+
}
+++ /dev/null
-/**
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.applications.frsync.impl;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Modified {@link SimplifiedOperationalListener} for usage of retry mechanism.
- */
-public class SimplifiedOperationalRetryListener extends SimplifiedOperationalListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalRetryListener.class);
- private final RetryRegistry retryRegistry;
-
- public SimplifiedOperationalRetryListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
- FlowCapableNodeDao configDao, RetryRegistry retryRegistry) {
- super(reactor, operationalSnapshot, configDao);
- this.retryRegistry = retryRegistry;
- }
-
- /**
- * Adding condition check for retry.
- *
- * @param modification operational datastore modification
- * @return true if reconciliation is needed, false otherwise
- */
- protected boolean isReconciliationNeeded(DataTreeModification<Node> modification) {
- return super.isReconciliationNeeded(modification) || isRegisteredAndConsistentForRetry(modification);
- }
-
- /**
- * If node is removed unregister for retry in addition.
- *
- * @param modification operational datastore modification
- * @return true for cache update, false for cache remove and retry unregister
- */
- protected boolean updateCache(DataTreeModification<Node> modification) {
- boolean nodeUpdated = super.updateCache(modification);
- if (!nodeUpdated) { // node removed if not updated
- retryRegistry.unregisterIfRegistered(nodeId(modification));
- }
- return nodeUpdated;
- }
-
- /**
- * Check if retry should be proceeded.
- *
- * @param modification operational modification
- * @return true if device is registered for retry and actual modification is consistent, false otherwise
- */
- private boolean isRegisteredAndConsistentForRetry(DataTreeModification<Node> modification) {
- final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
-
- if (!retryRegistry.isRegistered(nodeId)) {
- return false;
- }
-
- final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
- .getAugmentation(FlowCapableStatisticsGatheringStatus.class);
-
- if (gatheringStatus == null) {
- LOG.trace("Statistics gathering never started for: {}", nodeId.getValue());
- return false;
- }
-
- final SnapshotGatheringStatusEnd gatheringStatusEnd = gatheringStatus.getSnapshotGatheringStatusEnd();
-
- if (gatheringStatusEnd == null) {
- LOG.trace("Statistics gathering is not over yet for: {}", nodeId.getValue());
- return false;
- }
-
- if (!gatheringStatusEnd.isSucceeded()) {
- LOG.debug("Statistics gathering was not successful for: {}", nodeId.getValue());
- return false;
- }
-
- try {
- Date timestampOfRegistration = retryRegistry.getRegistration(nodeId);
- final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
- Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
- if (timestampOfStatistics.after(timestampOfRegistration)) {
- LOG.debug("Fresh operational present for: {} -> going retry!", nodeId.getValue());
- return true;
- }
- } catch (ParseException e) {
- LOG.error("Timestamp parsing error {}", e);
- }
- LOG.debug("Fresh operational not present for: {}", nodeId.getValue());
- return false;
- }
-}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
LOG.trace("syncup future {}", nodeId.getValue());
- final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {
- public Boolean call() throws Exception {
- final String oldThreadName = updateThreadName(nodeId);
+ final ListenableFuture<Boolean> syncup = executorService.submit(() -> {
+ final String oldThreadName = updateThreadName(nodeId);
- try {
- final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)
- .get(10000, TimeUnit.MILLISECONDS);
- LOG.trace("ret {} {}", nodeId.getValue(), ret);
- return true;
- } catch (TimeoutException e) {
- LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
- return false;
- } finally {
- updateThreadName(oldThreadName);
- }
+ try {
+ final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)
+ .get(10000, TimeUnit.MILLISECONDS);
+ LOG.trace("ret {} {}", nodeId.getValue(), ret);
+ return true;
+ } catch (TimeoutException e) {
+ LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
+ return false;
+ } finally {
+ updateThreadName(oldThreadName);
}
});
return oldName;
}
- private String updateThreadName(String name) {
+ private void updateThreadName(String name) {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
try {
} catch (Exception e) {
LOG.error("failed updating threadName {}", name, e);
}
- return oldName;
}
}
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorRetryDecorator.class);
private final SyncReactor delegate;
- private final RetryRegistry retryRegistry;
+ private final ReconciliationRegistry reconciliationRegistry;
- public SyncReactorRetryDecorator(final SyncReactor delegate, RetryRegistry retryRegistry) {
+ public SyncReactorRetryDecorator(final SyncReactor delegate, ReconciliationRegistry reconciliationRegistry) {
this.delegate = delegate;
- this.retryRegistry = retryRegistry;
+ this.reconciliationRegistry = reconciliationRegistry;
}
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
LOG.trace("syncup retry {}", nodeId.getValue());
- if (dsType == LogicalDatastoreType.CONFIGURATION && retryRegistry.isRegistered(nodeId)) {
+ if (dsType == LogicalDatastoreType.CONFIGURATION && reconciliationRegistry.isRegistered(nodeId)) {
LOG.trace("Config change ignored because device is in retry [{}]", nodeId);
return Futures.immediateFuture(Boolean.FALSE);
}
public Boolean apply(Boolean result) {
LOG.trace("syncup ret in retry {}", result);
if (result) {
- retryRegistry.unregisterIfRegistered(nodeId);
+ reconciliationRegistry.unregisterIfRegistered(nodeId);
return true;
} else {
- retryRegistry.register(nodeId);
+ reconciliationRegistry.register(nodeId);
// TODO elicit statistics gathering if not running actually
// triggerStatisticsGathering(nodeId);
return false;
resultVehicle = Futures.transform(resultVehicle, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
@Override
public ListenableFuture<RpcResult<Void>> apply(final RpcResult<Void> input) throws Exception {
- if (!input.isSuccessful()) {
- //TODO chain errors but not skip processing on first error return Futures.immediateFuture(input);
- //final ListenableFuture<RpcResult<Void>> singleVoidUpdateResult = Futures.transform(
- // Futures.asList Arrays.asList(input, output),
- // ReconcileUtil.<UpdateFlowOutput>createRpcResultCondenser("TODO"));
- }
-
final List<Batch> batchBag = new ArrayList<>();
int batchOrder = 0;
return batchMap;
}
- private int getNextBatchLimit(final PeekingIterator<Batch> inputBatchIterator, final int failureIndexLimit) {
- return inputBatchIterator.hasNext()
- ? inputBatchIterator.peek().getBatchOrder()
- : failureIndexLimit;
- }
-
@VisibleForTesting
static int assembleRemoveFlows(final List<Batch> batchBag, int batchOrder, final Map<TableKey, ItemSyncBox<Flow>> flowItemSyncTableMap) {
// process flow remove
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
*/
public class ItemSyncBox<I> {
- private Set<I> itemsToPush = new LinkedHashSet<>();
- private Set<ItemUpdateTuple<I>> itemsToUpdate = new LinkedHashSet<>();
+ private final Set<I> itemsToPush = new LinkedHashSet<>();
+ private final Set<ItemUpdateTuple<I>> itemsToUpdate = new LinkedHashSet<>();
public Set<I> getItemsToPush() {
return itemsToPush;
--- /dev/null
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.util;
+
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+
+/**
+ * Basic {@link DataTreeModification} related tools.
+ */
+public class ModificationUtil {
+ public static String nodeIdValue(DataTreeModification<Node> modification) {
+ final NodeId nodeId = nodeId(modification);
+
+ if (nodeId == null) {
+ return null;
+ }
+
+ return nodeId.getValue();
+ }
+
+ public static NodeId nodeId(DataTreeModification<Node> modification) {
+ final DataObjectModification<Node> rootNode = modification.getRootNode();
+ final Node dataAfter = rootNode.getDataAfter();
+
+ if (dataAfter != null) {
+ return dataAfter.getId();
+ }
+
+ final Node dataBefore = rootNode.getDataBefore();
+ if (dataBefore != null) {
+ return dataBefore.getId();
+ }
+
+ return null;
+ }
+
+ public static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
+ final Node dataAfter = modification.getRootNode().getDataAfter();
+ if (dataAfter == null) {
+ return null;
+ }
+ return dataAfter.getAugmentation(FlowCapableNode.class);
+ }
+}
package org.opendaylight.openflowplugin.applications.frsync.util;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
-import org.opendaylight.openflowplugin.applications.frsync.markandsweep.SwitchFlowId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
* @param gatherUpdates check content of pending item if present on device (and create update task eventually)
* @return list of safe synchronization steps
*/
- @VisibleForTesting
- static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
+ public static ItemSyncBox<Flow> resolveFlowDiffsInTable(final List<Flow> flowsConfigured,
final Map<SwitchFlowId, Flow> flowOperationalMap,
final boolean gatherUpdates) {
final ItemSyncBox<Flow> flowsSyncBox = new ItemSyncBox<>();
/**
* Holder of registration request for fresh operational.
*/
-public class RetryRegistry {
+public class ReconciliationRegistry {
- private static final Logger LOG = LoggerFactory.getLogger(RetryRegistry.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ReconciliationRegistry.class);
private final Map<NodeId, Date> registration = new ConcurrentHashMap<>();
public static final String DATE_AND_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
public Date register(NodeId nodeId) {
Date timestamp = new Date();
registration.put(nodeId, timestamp);
- if (timestamp != null) {
- LOG.debug("Registered for next consistent operational: {}", nodeId.getValue());
- }
+ LOG.debug("Registered for next consistent operational: {}", nodeId.getValue());
return timestamp;
}
*/
public class SemaphoreKeeperGuavaImpl<K> implements SemaphoreKeeper<K> {
- private LoadingCache<K, Semaphore> semaphoreCache;
+ private final LoadingCache<K, Semaphore> semaphoreCache;
public SemaphoreKeeperGuavaImpl(final int permits, final boolean fair) {
semaphoreCache = CacheBuilder.newBuilder()
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.openflowplugin.applications.frsync.markandsweep;
+package org.opendaylight.openflowplugin.applications.frsync.util;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
private static final NodeId NODE_ID = new NodeId("testNode");
private InstanceIdentifier<FlowCapableNode> fcNodePath;
private SimplifiedConfigListener nodeListenerConfig;
- private LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+ private final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
@Mock
private SyncReactor reactor;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.List;
import org.junit.Assert;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
private InstanceIdentifier<FlowCapableNode> fcNodePath;
private SimplifiedOperationalListener nodeListenerOperational;
private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
+ private final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
+ private final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
+ private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(ReconciliationRegistry.DATE_AND_TIME_FORMAT);
@Mock
private SyncReactor reactor;
private Node operationalNode;
@Mock
private FlowCapableNode fcOperationalNode;
+ @Mock
+ private ReconciliationRegistry reconciliationRegistry;
+ @Mock
+ private FlowCapableStatisticsGatheringStatus statisticsGatheringStatus;
+ @Mock
+ private SnapshotGatheringStatusEnd snapshotGatheringStatusEnd;
@Before
public void setUp() throws Exception {
final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
- nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao);
+ nodeListenerOperational = new SimplifiedOperationalListener(reactor, operationalSnapshot, configDao, reconciliationRegistry);
InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
fcNodePath = nodePath.augmentation(FlowCapableNode.class);
Mockito.verifyZeroInteractions(reactor);
}
+
+ @Test
+ public void testOnDataTreeChangedReconcileNotRegistered() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(false);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedReconcileButStaticsGatheringNotStarted() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(null);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedReconcileButStaticsGatheringNotFinished() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(null);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedReconcileButStaticsGatheringNotSuccessful() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(false);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedReconcileAndFreshOperationalNotPresent() throws ParseException {
+ final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
+ Mockito.when(reconciliationRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verifyZeroInteractions(reactor);
+ }
+
+ @Test
+ public void testOnDataTreeChangedReconcileAndFreshOperationalPresent() throws Exception {
+ final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
+ Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
+ .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
+ Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
+ Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
+ Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
+ Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
+ Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
+ Mockito.when(reconciliationRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
+ Mockito.verifyNoMoreInteractions(reactor);
+ Mockito.verify(roTx).close();
+ }
+
+ @Test
+ public void testOnDataTreeChangedReconcileAndNodeDeleted() {
+ Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
+ Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
+
+ nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
+
+ Mockito.verify(reconciliationRegistry).unregisterIfRegistered(NODE_ID);
+ Mockito.verifyZeroInteractions(reactor);
+ }
}
+++ /dev/null
-/**
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.applications.frsync.impl;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.Futures;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
-import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableStatisticsGatheringStatus;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.snapshot.gathering.status.grouping.SnapshotGatheringStatusEnd;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-
-/**
- * Test for {@link SimplifiedOperationalRetryListener}.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class SimplifiedOperationalRetryListenerTest {
-
- private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
- private static final NodeId NODE_ID = new NodeId("testNode");
- private InstanceIdentifier<FlowCapableNode> fcNodePath;
- private SimplifiedOperationalRetryListener nodeListenerOperational;
- private final String timestampBefore = "0000-12-12T01:01:01.000-07:00";
- private final String timestampAfter = "9999-12-12T01:01:01.000-07:00";
- private final LogicalDatastoreType dsType = LogicalDatastoreType.OPERATIONAL;
-
- @Mock
- private SyncReactor reactor;
- @Mock
- private ReadOnlyTransaction roTx;
- @Mock
- private DataTreeModification<Node> dataTreeModification;
- @Mock
- private DataObjectModification<Node> operationalModification;
- @Mock
- private FlowCapableNode configNode;
- @Mock
- private Node operationalNode;
- @Mock
- private FlowCapableNode fcOperationalNode;
- @Mock
- private RetryRegistry retryRegistry;
- @Mock
- private FlowCapableStatisticsGatheringStatus statisticsGatheringStatus;
- @Mock
- private SnapshotGatheringStatusEnd snapshotGatheringStatusEnd;
-
- @Before
- public void setUp() throws Exception {
- final DataBroker db = Mockito.mock(DataBroker.class);
- final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
- final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
- new FlowCapableNodeOdlDao(db, LogicalDatastoreType.CONFIGURATION));
-
- nodeListenerOperational = new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
- InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
- fcNodePath = nodePath.augmentation(FlowCapableNode.class);
-
- final DataTreeIdentifier<Node> dataTreeIdentifier =
- new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, nodePath);
-
- Mockito.when(db.newReadOnlyTransaction()).thenReturn(roTx);
- Mockito.when(operationalNode.getId()).thenReturn(NODE_ID);
- Mockito.when(dataTreeModification.getRootPath()).thenReturn(dataTreeIdentifier);
- Mockito.when(dataTreeModification.getRootNode()).thenReturn(operationalModification);
- Mockito.when(operationalNode.getAugmentation(FlowCapableNode.class)).thenReturn(fcOperationalNode);
- }
-
- @Test
- public void testOnDataTreeChangedRetryNotRegistered() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(false);
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verifyZeroInteractions(reactor);
- }
-
- @Test
- public void testOnDataTreeChangedRetryButStaticsGatheringNotStarted() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(null);
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verifyZeroInteractions(reactor);
- }
-
- @Test
- public void testOnDataTreeChangedRetryButStaticsGatheringNotFinished() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
- Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(null);
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verifyZeroInteractions(reactor);
- }
-
- @Test
- public void testOnDataTreeChangedRetryButStaticsGatheringNotSuccessful() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
- Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
- Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(false);
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verifyZeroInteractions(reactor);
- }
-
- @Test
- public void testOnDataTreeChangedRetryAndFreshOperationalNotPresent() throws ParseException {
- final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
- Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
- Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
- Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
- Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampBefore);
- Mockito.when(retryRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampAfter));
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verifyZeroInteractions(reactor);
- }
-
- @Test
- public void testOnDataTreeChangedRetryAndFreshOperationalPresent() throws Exception {
- final DateAndTime timestamp = Mockito.mock(DateAndTime.class);
- Mockito.when(roTx.read(LogicalDatastoreType.CONFIGURATION, fcNodePath))
- .thenReturn(Futures.immediateCheckedFuture(Optional.of(configNode)));
- Mockito.when(reactor.syncup(Matchers.<InstanceIdentifier<FlowCapableNode>>any(), Matchers.<FlowCapableNode>any(),
- Matchers.<FlowCapableNode>any(), Matchers.<LogicalDatastoreType>any())).thenReturn(Futures.immediateFuture(Boolean.TRUE));
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(operationalModification.getDataAfter()).thenReturn(operationalNode);
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
- Mockito.when(operationalNode.getAugmentation(FlowCapableStatisticsGatheringStatus.class)).thenReturn(statisticsGatheringStatus);
- Mockito.when(statisticsGatheringStatus.getSnapshotGatheringStatusEnd()).thenReturn(snapshotGatheringStatusEnd);
- Mockito.when(snapshotGatheringStatusEnd.isSucceeded()).thenReturn(true);
- Mockito.when(snapshotGatheringStatusEnd.getEnd()).thenReturn(timestamp);
- Mockito.when(snapshotGatheringStatusEnd.getEnd().getValue()).thenReturn(timestampAfter);
- Mockito.when(retryRegistry.getRegistration(NODE_ID)).thenReturn(simpleDateFormat.parse(timestampBefore));
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verify(reactor).syncup(fcNodePath, configNode, fcOperationalNode, dsType);
- Mockito.verifyNoMoreInteractions(reactor);
- Mockito.verify(roTx).close();
- }
-
- @Test
- public void testOnDataTreeChangedRetryAndNodeDeleted() {
- Mockito.when(operationalModification.getDataBefore()).thenReturn(operationalNode);
- Mockito.when(dataTreeModification.getRootNode().getModificationType()).thenReturn(DataObjectModification.ModificationType.DELETE);
-
- nodeListenerOperational.onDataTreeChanged(Collections.singleton(dataTreeModification));
-
- Mockito.verify(retryRegistry).unregisterIfRegistered(NODE_ID);
- Mockito.verifyZeroInteractions(reactor);
- }
-
-}
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
+import org.opendaylight.openflowplugin.applications.frsync.util.ReconciliationRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
@Mock
private SyncReactorImpl delegate;
@Mock
- private RetryRegistry retryRegistry;
+ private ReconciliationRegistry reconciliationRegistry;
@Mock
private FlowCapableNode fcConfigNode;
@Mock
@Before
public void setUp() {
- reactor = new SyncReactorRetryDecorator(delegate, retryRegistry);
+ reactor = new SyncReactorRetryDecorator(delegate, reconciliationRegistry);
InstanceIdentifier<Node> nodePath = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(NODE_ID));
fcNodePath = nodePath.augmentation(FlowCapableNode.class);
Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
Mockito.verifyNoMoreInteractions(delegate);
- Mockito.verify(retryRegistry).unregisterIfRegistered(NODE_ID);
+ Mockito.verify(reconciliationRegistry).unregisterIfRegistered(NODE_ID);
}
@Test
Mockito.verify(delegate).syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
Mockito.verifyNoMoreInteractions(delegate);
- Mockito.verify(retryRegistry).register(NODE_ID);
+ Mockito.verify(reconciliationRegistry).register(NODE_ID);
}
@Test
public void testSyncupConfigIgnoreInRetry() throws InterruptedException {
- Mockito.when(retryRegistry.isRegistered(NODE_ID)).thenReturn(true);
+ Mockito.when(reconciliationRegistry.isRegistered(NODE_ID)).thenReturn(true);
reactor.syncup(fcNodePath, fcConfigNode, fcOperationalNode, dsType);
@Captor
private ArgumentCaptor<ProcessFlatBatchInput> processFlatBatchInputCpt;
- private List<ItemSyncBox<Group>> groupsToAddOrUpdate;
- private List<ItemSyncBox<Group>> groupsToRemove;
- private ItemSyncBox<Meter> metersToAddOrUpdate;
- private ItemSyncBox<Meter> metersToRemove;
- private Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
- private Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
+ private final List<ItemSyncBox<Group>> groupsToAddOrUpdate;
+ private final List<ItemSyncBox<Group>> groupsToRemove;
+ private final ItemSyncBox<Meter> metersToAddOrUpdate;
+ private final ItemSyncBox<Meter> metersToRemove;
+ private final Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
+ private final Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
private List<Batch> batchBag;
private SyncPlanPushStrategyFlatBatchImpl syncPlanPushStrategy;
}
@Test
- public void testMapBachesToRanges() throws Exception {
+ public void testMapBatchesToRanges() throws Exception {
final List<Batch> inputBatchBag = Lists.newArrayList(
new BatchBuilder().setBatchOrder(0).build(),
new BatchBuilder().setBatchOrder(5).build(),
private SyncCrudCounters counters;
- private List<ItemSyncBox<Group>> groupsToAddOrUpdate;
- private List<ItemSyncBox<Group>> groupsToRemove;
- private ItemSyncBox<Meter> metersToAddOrUpdate;
- private ItemSyncBox<Meter> metersToRemove;
- private Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
- private Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
+ private final List<ItemSyncBox<Group>> groupsToAddOrUpdate;
+ private final List<ItemSyncBox<Group>> groupsToRemove;
+ private final ItemSyncBox<Meter> metersToAddOrUpdate;
+ private final ItemSyncBox<Meter> metersToRemove;
+ private final Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
+ private final Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
public SyncPlanPushStrategyIncrementalImplTest() {
groupsToAddOrUpdate = Lists.newArrayList(DiffInputFactory.createGroupSyncBox(1, 2, 3),
Assert.assertTrue(vehicle.get().isSuccessful());
}
- @Test
- public void testCreateRpcResultCondenser() throws Exception {
-
- }
-
/**
* add one missing group
*
}
return syncBox1;
}
-
- @Test
- public void testResolveMeterDiffs() throws Exception {
-
- }
-
- @Test
- public void testResolveFlowDiffsInTable() throws Exception {
-
- }
-
- @Test
- public void testResolveFlowDiffsInAllTables() throws Exception {
-
- }
}
\ No newline at end of file
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
/**
- * Test for {@link RetryRegistry}.
+ * Test for {@link ReconciliationRegistry}.
*/
@RunWith(MockitoJUnitRunner.class)
-public class RetryRegistryTest {
+public class ReconciliationRegistryTest {
private static final NodeId NODE_ID = new NodeId("testNode");
- private RetryRegistry retryRegistry;
+ private ReconciliationRegistry reconciliationRegistry;
@Before
public void setUp() throws Exception {
- retryRegistry = new RetryRegistry();
+ reconciliationRegistry = new ReconciliationRegistry();
}
@Test
public void testRegister() {
- Date timestamp = retryRegistry.register(NODE_ID);
- Assert.assertEquals(true, retryRegistry.isRegistered(NODE_ID));
+ Date timestamp = reconciliationRegistry.register(NODE_ID);
+ Assert.assertEquals(true, reconciliationRegistry.isRegistered(NODE_ID));
Assert.assertNotNull(timestamp);
}
@Test
public void testUnregisterIfRegistered() {
- retryRegistry.register(NODE_ID);
- Date timestamp = retryRegistry.unregisterIfRegistered(NODE_ID);
- Assert.assertEquals(false, retryRegistry.isRegistered(NODE_ID));
+ reconciliationRegistry.register(NODE_ID);
+ Date timestamp = reconciliationRegistry.unregisterIfRegistered(NODE_ID);
+ Assert.assertEquals(false, reconciliationRegistry.isRegistered(NODE_ID));
Assert.assertNotNull(timestamp);
}
@Test
public void testUnregisterIfNotRegistered() {
- Date timestamp = retryRegistry.unregisterIfRegistered(NODE_ID);
- Assert.assertEquals(false, retryRegistry.isRegistered(NODE_ID));
+ Date timestamp = reconciliationRegistry.unregisterIfRegistered(NODE_ID);
+ Assert.assertEquals(false, reconciliationRegistry.isRegistered(NODE_ID));
Assert.assertNull(timestamp);
}
private static class Worker implements Runnable {
private final SemaphoreKeeper<String> keeper;
private final String key;
- private ConcurrentMap<Integer, Integer> counter = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, Integer> counter = new ConcurrentHashMap<>();
private volatile int index = 0;
public Worker(SemaphoreKeeper<String> keeper, final String key) {