<artifactId>model-inventory</artifactId>
</dependency>
- <dependency>
- <groupId>org.opendaylight.openflowplugin.model</groupId>
- <artifactId>model-flow-base</artifactId>
- </dependency>
-
<dependency>
<groupId>org.opendaylight.openflowplugin.model</groupId>
<artifactId>model-flow-service</artifactId>
/**
* @param resultVehicle bootstrap future - execution will chain it's async calls to this one
* @param diffInput wraps all diff data required for any strategy ({add,remove,update} x {flow,group,meter})
- * @param counters reference to internal one-shot statistics - summary off successfully pushed items shall be recorded here
+ * @param counters reference to internal one-shot statistics - summary off successfully pushed items
+ * shall be recorded here
* @return last future of the chain
*/
ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
* @param dsType type of DS change
* @return synchronization outcome
*/
- ListenableFuture<Boolean> syncup(InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- FlowCapableNode configTree, FlowCapableNode operationalTree,
- LogicalDatastoreType dsType) throws InterruptedException;
-
+ ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException;
}
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
if (optFuture.isPresent()) {
final ListenableFuture<Boolean> future = optFuture.get();
final Boolean ret = future.get(15000, TimeUnit.MILLISECONDS);
- LOG.debug("syncup ret {} {} {} thread:{}", dsType(), ret, nodeId, threadName());
+ LOG.debug("syncup ret {} {} {} thread:{}", dsType(), ret, nodeId.getValue(), threadName());
}
} catch (InterruptedException e) {
- LOG.warn("permit for forwarding rules sync not acquired: {}", nodeId);
+ LOG.warn("permit for forwarding rules sync not acquired: {}", nodeId.getValue());
} catch (Exception e) {
- LOG.error("error processing inventory node modification: {}", nodeId, e);
+ LOG.error("error processing inventory node modification: {}", nodeId.getValue(), e);
}
}
}
protected abstract Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<T> modification) throws ReadFailedException, InterruptedException;
+ DataTreeModification<T> modification) throws InterruptedException;
protected abstract LogicalDatastoreType dsType();
public class FlowForwarder implements ForwardingRulesCommitter<Flow, AddFlowOutput, RemoveFlowOutput, UpdateFlowOutput> {
private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class);
- private SalFlowService salFlowService;
+ private final SalFlowService salFlowService;
public FlowForwarder(final SalFlowService salFlowService) {
this.salFlowService = salFlowService;
final InstanceIdentifier<FlowCapableNode> nodeIdent) {
LOG.trace("Forwarding Flow REMOVE request Tbl id, node Id {} {}",
identifier, nodeIdent);
-
+
final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
final InstanceIdentifier<FlowCapableNode> nodeIdent) {
LOG.trace("Forwarding Flow UPDATE request [Tbl id, node Id {} {} {}",
identifier, nodeIdent, update);
-
+
final Future<RpcResult<UpdateFlowOutput>> output;
final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
if (tableIdValidationPrecondition(tableKey, update)) {
final InstanceIdentifier<FlowCapableNode> nodeIdent) {
LOG.trace("Forwarding the Flow ADD request [Tbl id, node Id {} {} {}",
identifier, nodeIdent, addDataObj);
-
+
final Future<RpcResult<AddFlowOutput>> output;
final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class);
if (tableIdValidationPrecondition(tableKey, addDataObj)) {
public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
- public static final int STARTUP_LOOP_TICK = 500;
- public static final int STARTUP_LOOP_MAX_RETRIES = 8;
+ private static final int STARTUP_LOOP_TICK = 500;
+ private static final int STARTUP_LOOP_MAX_RETRIES = 8;
private final DataBroker dataService;
private final SalTableService salTableService;
private final SalFlatBatchService flatBatchService;
- /** wildcard path to flow-capable-node augmentation of inventory node */
+ /** Wildcard path to flow-capable-node augmentation of inventory node. */
private static final InstanceIdentifier<FlowCapableNode> FLOW_CAPABLE_NODE_WC_PATH =
InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class);
- /** wildcard path to node (not flow-capable-node augmentation) of inventory node */
+ /** Wildcard path to node (not flow-capable-node augmentation) of inventory node. */
private static final InstanceIdentifier<Node> NODE_WC_PATH =
InstanceIdentifier.create(Nodes.class).child(Node.class);
}\r
\r
/**\r
- * This will be rewritten in JUnits using SynchronousExecutorService\r
+ * This will be rewritten in JUnits using SynchronousExecutorService.\r
*/\r
@VisibleForTesting // should not be private and final\r
static PceExecutorsFactory DEFAULT_EXECUTORS = new PceExecutorsFactory() {\r
LOG.trace("Received the Meter REMOVE request [Tbl id, node Id {} {}",
identifier, nodeIdent);
-
+
final RemoveMeterInputBuilder builder = new RemoveMeterInputBuilder(removeDataObj);
builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
final InstanceIdentifier<FlowCapableNode> nodeIdent) {
LOG.trace("Received the Meter UPDATE request [Tbl id, node Id {} {} {}",
identifier, nodeIdent, update);
-
+
final UpdateMeterInputBuilder builder = new UpdateMeterInputBuilder();
builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
final InstanceIdentifier<FlowCapableNode> nodeIdent) {
LOG.trace("Received the Meter ADD request [Tbl id, node Id {} {} {}",
identifier, nodeIdent, addDataObj);
-
+
final AddMeterInputBuilder builder = new AddMeterInputBuilder(addDataObj);
builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
LOG.trace("Inventory Config changes {}", modifications.size());
super.onDataTreeChanged(modifications);
}
-
+
/**
* Compare cached operational with current config modification. If operational is not present
* skip calling Inventory RPCs.
- *
* @throws InterruptedException from syncup
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
final FlowCapableNode dataAfter = configModification.getDataAfter();
final ListenableFuture<Boolean> endResult;
if (dataBefore == null && dataAfter != null) {
- endResult = onNodeAdded(nodePath, dataBefore, dataAfter, operationalNode.get());
+ endResult = onNodeAdded(nodePath, dataAfter, operationalNode.get());
} else if (dataBefore != null && dataAfter == null) {
- endResult = onNodeDeleted(nodePath, dataBefore, operationalNode.get());
+ endResult = onNodeDeleted(nodePath, dataBefore);
} else {
- endResult = onNodeUpdated(nodePath, dataBefore, dataAfter, operationalNode.get());
+ endResult = onNodeUpdated(nodePath, dataBefore, dataAfter);
}
return Optional.of(endResult);
* optimal case (but the switch could be reprogrammed by another person/system.</li>
* </ul>
*/
- protected ListenableFuture<Boolean> onNodeAdded(InstanceIdentifier<FlowCapableNode> nodePath,
- FlowCapableNode dataBefore, FlowCapableNode dataAfter, FlowCapableNode operationalNode)
- throws InterruptedException {
- LOG.trace("onNodeAdded {}", nodePath);
+ private ListenableFuture<Boolean> onNodeAdded(InstanceIdentifier<FlowCapableNode> nodePath,
+ FlowCapableNode dataAfter, FlowCapableNode operationalNode) throws InterruptedException {
+ NodeId nodeId = PathUtil.digNodeId(nodePath);
+ LOG.trace("onNodeAdded {}", nodeId);
final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, operationalNode, dsType());
return endResult;
}
* system which is updating operational store (that components is also trying to solve
* scale/performance issues on several layers).
*/
- protected ListenableFuture<Boolean> onNodeUpdated(InstanceIdentifier<FlowCapableNode> nodePath,
- FlowCapableNode dataBefore, FlowCapableNode dataAfter, FlowCapableNode operationalNodeNode)
- throws InterruptedException {
- LOG.trace("onNodeUpdated {}", nodePath);
+ private ListenableFuture<Boolean> onNodeUpdated(InstanceIdentifier<FlowCapableNode> nodePath,
+ FlowCapableNode dataBefore, FlowCapableNode dataAfter) throws InterruptedException {
+ NodeId nodeId = PathUtil.digNodeId(nodePath);
+ LOG.trace("onNodeUpdated {}", nodeId);
final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, dataAfter, dataBefore, dsType());
return endResult;
}
* probably optimized using dedicated wipe-out RPC, but it has impact on switch if it is
* programmed by two person/system
*/
- protected ListenableFuture<Boolean> onNodeDeleted(InstanceIdentifier<FlowCapableNode> nodePath,
- FlowCapableNode dataBefore, FlowCapableNode operationalNode) throws InterruptedException {
- LOG.trace("onNodeDeleted {}", nodePath);
+ private ListenableFuture<Boolean> onNodeDeleted(InstanceIdentifier<FlowCapableNode> nodePath,
+ FlowCapableNode dataBefore) throws InterruptedException {
+ NodeId nodeId = PathUtil.digNodeId(nodePath);
+ LOG.trace("onNodeDeleted {}", nodeId);
final ListenableFuture<Boolean> endResult = reactor.syncup(nodePath, null, dataBefore, dsType());
return endResult;
}
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
public class SimplifiedOperationalListener extends AbstractFrmSyncListener<Node> {
private static final Logger LOG = LoggerFactory.getLogger(SimplifiedOperationalListener.class);
- protected final SyncReactor reactor;
- protected final FlowCapableNodeSnapshotDao operationalSnapshot;
- protected final FlowCapableNodeDao configDao;
+ private final SyncReactor reactor;
+ private final FlowCapableNodeSnapshotDao operationalSnapshot;
+ private final FlowCapableNodeDao configDao;
public SimplifiedOperationalListener(SyncReactor reactor, FlowCapableNodeSnapshotDao operationalSnapshot,
FlowCapableNodeDao configDao) {
* @throws InterruptedException from syncup
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<Node> modification) throws ReadFailedException, InterruptedException {
+ DataTreeModification<Node> modification) throws InterruptedException {
updateCache(modification);
if (isReconciliationNeeded(modification)) {
}
/**
- * ModificationType.DELETE
+ * ModificationType.DELETE.
*/
private boolean isDelete(DataTreeModification<Node> modification) {
if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
}
}
- static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
+ private static FlowCapableNode flowCapableNodeAfter(DataTreeModification<Node> modification) {
final Node dataAfter = modification.getRootNode().getDataAfter();
if (dataAfter == null) {
return null;
return dataAfter.getAugmentation(FlowCapableNode.class);
}
- static boolean safeConnectorsEmpty(Node node) {
+ private static boolean safeConnectorsEmpty(Node node) {
if (node == null) {
return true;
}
return nodeConnectors == null || nodeConnectors.isEmpty();
}
- static String nodeIdValue(DataTreeModification<Node> modification) {
+ private static String nodeIdValue(DataTreeModification<Node> modification) {
final NodeId nodeId = nodeId(modification);
if (nodeId == null) {
package org.opendaylight.openflowplugin.applications.frsync.impl;
-import com.google.common.annotations.VisibleForTesting;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
* @param modification operational modification
* @return true if device is registered for retry and actual modification is consistent, false otherwise
*/
- @VisibleForTesting
- boolean isRegisteredAndConsistentForRetry(DataTreeModification<Node> modification) {
+ private boolean isRegisteredAndConsistentForRetry(DataTreeModification<Node> modification) {
final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
if (!retryRegistry.isRegistered(nodeId)) {
}
try {
- Date timestampOfRegistration = retryRegistry.getRegistration(nodeId);;
+ Date timestampOfRegistration = retryRegistry.getRegistration(nodeId);
final SimpleDateFormat simpleDateFormat = new SimpleDateFormat(RetryRegistry.DATE_AND_TIME_FORMAT);
Date timestampOfStatistics = simpleDateFormat.parse(gatheringStatusEnd.getEnd().getValue());
if (timestampOfStatistics.after(timestampOfRegistration)) {
public class SyncReactorFutureDecorator implements SyncReactor {\r
\r
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);\r
-\r
+ public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";\r
private final SyncReactor delegate;\r
private final ListeningExecutorService executorService;\r
\r
- public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";\r
-\r
public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
this.delegate = delegate;\r
this.executorService = executorService;\r
final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
final LogicalDatastoreType dsType) throws InterruptedException {\r
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("syncup {}", nodeId.getValue());\r
+ LOG.trace("syncup future {}", nodeId.getValue());\r
\r
final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {\r
public Boolean call() throws Exception {\r
final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
final LogicalDatastoreType dsType) throws InterruptedException {\r
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("doSyncupInFuture {}", nodeId.getValue());\r
+ LOG.trace("doSyncupInFuture future {}", nodeId.getValue());\r
\r
return delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);\r
}\r
\r
- static String threadName() {\r
- final Thread currentThread = Thread.currentThread();\r
- return currentThread.getName();\r
- }\r
-\r
- protected String updateThreadName(NodeId nodeId) {\r
+ private String updateThreadName(NodeId nodeId) {\r
final Thread currentThread = Thread.currentThread();\r
final String oldName = currentThread.getName();\r
try {\r
return oldName;\r
}\r
\r
- protected String updateThreadName(String name) {\r
+ private String updateThreadName(String name) {\r
final Thread currentThread = Thread.currentThread();\r
final String oldName = currentThread.getName();\r
try {\r
package org.opendaylight.openflowplugin.applications.frsync.impl;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureZipDecorator.class);
@GuardedBy("compressionGuard")
- protected final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
- protected final Semaphore compressionGuard = new Semaphore(1, false);
+ private final Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> compressionQueue = new HashMap<>();
+ private final Semaphore compressionGuard = new Semaphore(1, false);
public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
super(delegate, executorService);
* If there is config delta in compression queue for the device and new configuration is coming,
* update its zip queue entry. Create/replace zip queue entry for the device with operational delta otherwise.
*/
- protected boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,
- final LogicalDatastoreType dsType) {
+ private boolean updateCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) {
final ZipQueueEntry previousEntry = compressionQueue.get(flowcapableNodePath);
if (previousEntry != null && dsType == LogicalDatastoreType.CONFIGURATION
}
}
- @VisibleForTesting
- Map<InstanceIdentifier<FlowCapableNode>, ZipQueueEntry> getCompressionQueue() {
- return compressionQueue;
- }
}
\ No newline at end of file
final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
final LogicalDatastoreType dsType) throws InterruptedException {\r
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("syncup {}", nodeId.getValue());\r
+ LOG.trace("syncup guard {}", nodeId.getValue());\r
\r
final long stampBeforeGuard = System.nanoTime();\r
final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
public void onSuccess(@Nullable final Boolean result) {\r
if (LOG.isDebugEnabled()) {\r
final long stampFinished = System.nanoTime();\r
- LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{}, thread:{}", nodeId.getValue(),\r
+ LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
formatNanos(stampFinished - stampBeforeGuard),\r
formatNanos(stampFinished - stampAfterGuard),\r
formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard, threadName());\r
+ guard.availablePermits(), threadName());\r
}\r
\r
- releaseGuardForNodeId(nodeId, guard);\r
+ releaseGuardForNodeId(guard);\r
}\r
\r
@Override\r
public void onFailure(final Throwable t) {\r
if (LOG.isDebugEnabled()) {\r
final long stampFinished = System.nanoTime();\r
- LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} thread:{}", nodeId.getValue(),\r
+ LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
formatNanos(stampFinished - stampBeforeGuard),\r
formatNanos(stampFinished - stampAfterGuard),\r
formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard, threadName());\r
+ guard.availablePermits(), threadName());\r
}\r
\r
- releaseGuardForNodeId(nodeId, guard);\r
+ releaseGuardForNodeId(guard);\r
}\r
});\r
return endResult;\r
- } catch(InterruptedException e) {\r
- releaseGuardForNodeId(nodeId, guard);\r
+ } catch (InterruptedException e) {\r
+ releaseGuardForNodeId(guard);\r
throw e;\r
}\r
}\r
\r
- protected String formatNanos(long nanos) {\r
+ private String formatNanos(long nanos) {\r
return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
}\r
\r
/**\r
- * get guard\r
- *\r
+ * Get guard and lock for node.\r
* @param flowcapableNodePath II of node for which guard should be acquired\r
* @return semaphore guard\r
*/\r
- protected Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
+ private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
throws InterruptedException {\r
final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
"no guard for " + flowcapableNodePath);\r
}\r
\r
/**\r
- * unlock per node\r
- *\r
- * @param nodeId NodeId of node which should be unlocked\r
- * @param guard semaphore guard\r
+ * Unlock and release guard.\r
+ * @param guard semaphore guard which should be unlocked\r
*/\r
- protected void releaseGuardForNodeId(final NodeId nodeId, final Semaphore guard) {\r
+ private void releaseGuardForNodeId(final Semaphore guard) {\r
if (guard == null) {\r
return;\r
}\r
guard.release();\r
}\r
\r
- static String threadName() {\r
+ private static String threadName() {\r
final Thread currentThread = Thread.currentThread();\r
return currentThread.getName();\r
}\r
final FlowCapableNode configTree, final FlowCapableNode operationalTree,
final LogicalDatastoreType dsType) {
- LOG.trace("syncup {} cfg:{} oper:{}", nodeIdent, configTree == null ? "is null" : "non null", operationalTree == null ? "is null" : "non null");
+ final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
+ LOG.trace("syncup impl {} cfg:{} oper:{}", nodeId.getValue(), configTree == null ? "is null" : "non null", operationalTree == null ? "is null" : "non null");
final SyncCrudCounters counters = new SyncCrudCounters();
+
/**
* instructions:
* - extract diff changes and prepare change steps in safe order
* - flows - meters - groups (reordered)
**/
- final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
-
final List<ItemSyncBox<Group>> groupsToAddOrUpdate = extractGroupsToAddOrUpdate(nodeId, configTree, operationalTree);
final ItemSyncBox<Meter> metersToAddOrUpdate = extractMetersToAddOrUpdate(nodeId, configTree, operationalTree);
final Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate = extractFlowsToAddOrUpdate(nodeId, configTree, operationalTree);
}
@VisibleForTesting
- static List<ItemSyncBox<Group>> extractGroupsToAddOrUpdate(final NodeId nodeId,
- final FlowCapableNode flowCapableNodeConfigured,
- final FlowCapableNode flowCapableNodeOperational) {
+ private static List<ItemSyncBox<Group>> extractGroupsToAddOrUpdate(final NodeId nodeId,
+ final FlowCapableNode flowCapableNodeConfigured,
+ final FlowCapableNode flowCapableNodeOperational) {
final List<Group> groupsConfigured = ReconcileUtil.safeGroups(flowCapableNodeConfigured);
final List<Group> groupsOperational = ReconcileUtil.safeGroups(flowCapableNodeOperational);
final Map<Long, Group> groupOperationalMap = FlowCapableNodeLookups.wrapGroupsToMap(groupsOperational);
}
@VisibleForTesting
- static ItemSyncBox<Meter> extractMetersToAddOrUpdate(final NodeId nodeId,
- final FlowCapableNode flowCapableNodeConfigured,
- final FlowCapableNode flowCapableNodeOperational) {
+ private static ItemSyncBox<Meter> extractMetersToAddOrUpdate(final NodeId nodeId,
+ final FlowCapableNode flowCapableNodeConfigured,
+ final FlowCapableNode flowCapableNodeOperational) {
final List<Meter> metersConfigured = ReconcileUtil.safeMeters(flowCapableNodeConfigured);
final List<Meter> metersOperational = ReconcileUtil.safeMeters(flowCapableNodeOperational);
final Map<MeterId, Meter> meterOperationalMap = FlowCapableNodeLookups.wrapMetersToMap(metersOperational);
}
@VisibleForTesting
- static Map<TableKey, ItemSyncBox<Flow>> extractFlowsToAddOrUpdate(final NodeId nodeId,
- final FlowCapableNode flowCapableNodeConfigured,
- final FlowCapableNode flowCapableNodeOperational) {
+ private static Map<TableKey, ItemSyncBox<Flow>> extractFlowsToAddOrUpdate(final NodeId nodeId,
+ final FlowCapableNode flowCapableNodeConfigured,
+ final FlowCapableNode flowCapableNodeOperational) {
final List<Table> tablesConfigured = ReconcileUtil.safeTables(flowCapableNodeConfigured);
if (tablesConfigured.isEmpty()) {
return Collections.emptyMap();
}
@VisibleForTesting
- static Map<TableKey, ItemSyncBox<Flow>> extractFlowsToRemove(final NodeId nodeId,
- final FlowCapableNode flowCapableNodeConfigured,
- final FlowCapableNode flowCapableNodeOperational) {
+ private static Map<TableKey, ItemSyncBox<Flow>> extractFlowsToRemove(final NodeId nodeId,
+ final FlowCapableNode flowCapableNodeConfigured,
+ final FlowCapableNode flowCapableNodeOperational) {
final List<Table> tablesOperational = ReconcileUtil.safeTables(flowCapableNodeOperational);
if (tablesOperational.isEmpty()) {
return Collections.emptyMap();
}
@VisibleForTesting
- static ItemSyncBox<Meter> extractMetersToRemove(final NodeId nodeId,
- final FlowCapableNode flowCapableNodeConfigured,
- final FlowCapableNode flowCapableNodeOperational) {
+ private static ItemSyncBox<Meter> extractMetersToRemove(final NodeId nodeId,
+ final FlowCapableNode flowCapableNodeConfigured,
+ final FlowCapableNode flowCapableNodeOperational) {
final List<Meter> metersConfigured = ReconcileUtil.safeMeters(flowCapableNodeConfigured);
final List<Meter> metersOperational = ReconcileUtil.safeMeters(flowCapableNodeOperational);
final Map<MeterId, Meter> meterConfiguredMap = FlowCapableNodeLookups.wrapMetersToMap(metersConfigured);
}
@VisibleForTesting
- static List<ItemSyncBox<Group>> extractGroupsToRemove(final NodeId nodeId,
- final FlowCapableNode flowCapableNodeConfigured,
- final FlowCapableNode flowCapableNodeOperational) {
+ private static List<ItemSyncBox<Group>> extractGroupsToRemove(final NodeId nodeId,
+ final FlowCapableNode flowCapableNodeConfigured,
+ final FlowCapableNode flowCapableNodeOperational) {
final List<Group> groupsConfigured = ReconcileUtil.safeGroups(flowCapableNodeConfigured);
final List<Group> groupsOperational = ReconcileUtil.safeGroups(flowCapableNodeOperational);
final Map<Long, Group> groupConfiguredMap = FlowCapableNodeLookups.wrapGroupsToMap(groupsConfigured);
/**
* Adding retry mechanism in case of unsuccessful syncup.
*/
-public class SyncReactorRetryDecorator implements SyncReactor{
+public class SyncReactorRetryDecorator implements SyncReactor {
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorRetryDecorator.class);
private final SyncReactor delegate;
private final RetryRegistry retryRegistry;
- public SyncReactorRetryDecorator (final SyncReactor delegate, RetryRegistry retryRegistry) {
+ public SyncReactorRetryDecorator(final SyncReactor delegate, RetryRegistry retryRegistry) {
this.delegate = delegate;
this.retryRegistry = retryRegistry;
}
- public ListenableFuture<Boolean> syncup (final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,
- final LogicalDatastoreType dsType) throws InterruptedException {
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
LOG.trace("syncup retry {}", nodeId.getValue());
}
- protected ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
+ ListenableFuture<RpcResult<Void>> addMissingFlows(final NodeId nodeId,
final InstanceIdentifier<FlowCapableNode> nodeIdent,
final Map<TableKey, ItemSyncBox<Flow>> flowsInTablesSyncBox,
final SyncCrudCounters counters) {
*/
}
- protected ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
+ ListenableFuture<RpcResult<Void>> removeRedundantFlows(final NodeId nodeId,
final InstanceIdentifier<FlowCapableNode> nodeIdent,
final Map<TableKey, ItemSyncBox<Flow>> removalPlan,
final SyncCrudCounters counters) {
}
- protected ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
+ ListenableFuture<RpcResult<Void>> removeRedundantMeters(final NodeId nodeId,
final InstanceIdentifier<FlowCapableNode> nodeIdent,
final ItemSyncBox<Meter> meterRemovalPlan,
final SyncCrudCounters counters) {
PathUtil.digNodePath(nodeIdent), transactionService));
}
- protected ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
+ ListenableFuture<RpcResult<Void>> addMissingMeters(final NodeId nodeId,
final InstanceIdentifier<FlowCapableNode> nodeIdent,
final ItemSyncBox<Meter> syncBox,
final SyncCrudCounters counters) {
*/
}
- protected ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
+ ListenableFuture<RpcResult<Void>> addMissingGroups(final NodeId nodeId,
final InstanceIdentifier<FlowCapableNode> nodeIdent,
final List<ItemSyncBox<Group>> groupsAddPlan,
final SyncCrudCounters counters) {
public class SynchronizationDiffInput {
private final InstanceIdentifier<FlowCapableNode> nodeIdent;
- final List<ItemSyncBox<Group>> groupsToAddOrUpdate;
- final ItemSyncBox<Meter> metersToAddOrUpdate;
- final Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
- final Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
- final ItemSyncBox<Meter> metersToRemove;
- final List<ItemSyncBox<Group>> groupsToRemove;
+ private final List<ItemSyncBox<Group>> groupsToAddOrUpdate;
+ private final ItemSyncBox<Meter> metersToAddOrUpdate;
+ private final Map<TableKey, ItemSyncBox<Flow>> flowsToAddOrUpdate;
+ private final Map<TableKey, ItemSyncBox<Flow>> flowsToRemove;
+ private final ItemSyncBox<Meter> metersToRemove;
+ private final List<ItemSyncBox<Group>> groupsToRemove;
public SynchronizationDiffInput(final InstanceIdentifier<FlowCapableNode> nodeIdent,
final List<ItemSyncBox<Group>> groupsToAddOrUpdate,
return flowMap;
}
-
+
public static Flow flowMapLookupExisting(Flow flow, Map<SwitchFlowId, Flow> flowConfigMap) {
return flowConfigMap.get(new SwitchFlowId(flow));
}
}
/**
- * Tuple holder for original and updated item
- *
+ * Tuple holder for original and updated item.
* @param <I> basic type
*/
public static final class ItemUpdateTuple<I> {
public Semaphore summonGuard(final @Nonnull K key) {
return semaphoreCache.getUnchecked(key);
}
-
+
@Override
public String toString() {
return super.toString() + " size:" + (semaphoreCache == null ? null : semaphoreCache.size()) + " " + semaphoreCache;
package org.opendaylight.openflowplugin.applications.frsync.util;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.impl.SyncReactorFutureZipDecorator;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
/**
- * Simple compression queue entry for {@link org.opendaylight.openflowplugin.applications.frsync.impl.SyncReactorFutureZipDecorator}.
+ * Simple compression queue entry for {@link SyncReactorFutureZipDecorator}.
*/
public class ZipQueueEntry {
- private FlowCapableNode after;
- private FlowCapableNode before;
- private LogicalDatastoreType dsTypeBefore;
+ private final FlowCapableNode after;
+ private final FlowCapableNode before;
+ private final LogicalDatastoreType dsTypeBefore;
- public ZipQueueEntry(final FlowCapableNode after, final FlowCapableNode before, final LogicalDatastoreType dsTypeBefore) {
+ public ZipQueueEntry(final FlowCapableNode after, final FlowCapableNode before,
+ final LogicalDatastoreType dsTypeBefore) {
this.after = after;
this.before = before;
this.dsTypeBefore = dsTypeBefore;
-
}
public FlowCapableNode getLeft() {
private static final NodeId NODE_ID = new NodeId("test-node");
private SyncReactorRetryDecorator reactor;
private InstanceIdentifier<FlowCapableNode> fcNodePath;
- final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
+ private final LogicalDatastoreType dsType = LogicalDatastoreType.CONFIGURATION;
@Mock
private SyncReactorImpl delegate;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Test for {@link SyncPlanPushStrategyIncrementalImpl}.
@RunWith(MockitoJUnitRunner.class)
public class SyncPlanPushStrategyIncrementalImplTest {
- private static final Logger LOG = LoggerFactory.getLogger(SyncPlanPushStrategyIncrementalImplTest.class);
-
private static final NodeId NODE_ID = new NodeId("unit-nodeId");
private static final InstanceIdentifier<FlowCapableNode> NODE_IDENT = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(NODE_ID)).augmentation(FlowCapableNode.class);
public class ReconcileUtilTest {
private static final NodeId NODE_ID = new NodeId("unit-node-id");
- private InstanceIdentifier<Node> NODE_IDENT = InstanceIdentifier.create(Nodes.class)
+ private final InstanceIdentifier<Node> NODE_IDENT = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(NODE_ID));
private static final Splitter COMMA_SPLITTER = Splitter.on(",");
for (int i = 0; i < steps; i++) {
executorService.submit(task);
}
- Thread.sleep(1000L);
+ Thread.sleep(100L);
System.gc();
executorService.shutdown();