X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrm%2Fimpl%2FFlowNodeReconciliationImpl.java;h=dc620b6790a92dbda42c26a2f57bbed0960e8c92;hb=f35e2fb153438697238b3095e8323b8a343c1c93;hp=2b62037b373c20eade1284e0ff6d14fdcd43a556;hpb=d30249f84eaf9878cf68fb76c46f7e99587f4e57;p=openflowplugin.git diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java index 2b62037b37..dc620b6790 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java @@ -8,27 +8,22 @@ package org.opendaylight.openflowplugin.applications.frm.impl; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; + import java.util.concurrent.Callable; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; + +import org.opendaylight.controller.md.sal.binding.api.*; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation; @@ -65,12 +60,13 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + /** * forwardingrules-manager @@ -92,7 +88,16 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { private final ForwardingRulesManager provider; public static final String SEPARATOR = ":"; - private ListenerRegistration listenerRegistration; + private ListenerRegistration listenerRegistration; + + private final int THREAD_POOL_SIZE = 4; + ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); + + private static final InstanceIdentifier II_TO_FLOW_CAPABLE_NODE + = InstanceIdentifier.builder(Nodes.class) + .child(Node.class) + .augmentation(FlowCapableNode.class) + .build(); public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) { this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!"); @@ -101,14 +106,17 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { final InstanceIdentifier flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class) .child(Node.class).augmentation(FlowCapableNode.class); + final DataTreeIdentifier treeId = + new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, flowNodeWildCardIdentifier); + + try { SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK, ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES); - try { - listenerRegistration = looper.loopUntilNoException(new Callable>() { + + listenerRegistration = looper.loopUntilNoException(new Callable>() { @Override - public ListenerRegistration call() throws Exception { - return db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, - flowNodeWildCardIdentifier, FlowNodeReconciliationImpl.this, DataChangeScope.BASE); + public ListenerRegistration call() throws Exception { + return dataBroker.registerDataTreeChangeListener(treeId, FlowNodeReconciliationImpl.this); } }); } catch (Exception e) { @@ -132,46 +140,60 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> changeEvent) { - Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!"); - /* All DataObjects for create */ - final Set> createdData = changeEvent.getCreatedData() != null - ? changeEvent.getCreatedData().keySet() : Collections.> emptySet(); - /* All DataObjects for remove */ - final Set> removeData = changeEvent.getRemovedPaths() != null - ? changeEvent.getRemovedPaths() : Collections.> emptySet(); - /* All updated DataObjects */ - final Map, DataObject> updateData = changeEvent.getUpdatedData() != null - ? changeEvent.getUpdatedData() : Collections., DataObject>emptyMap(); - - for (InstanceIdentifier entryKey : removeData) { - final InstanceIdentifier nodeIdent = entryKey - .firstIdentifierOf(FlowCapableNode.class); - if ( ! nodeIdent.isWildcarded()) { - flowNodeDisconnected(nodeIdent); + public void onDataTreeChanged(@Nonnull Collection> changes) { + Preconditions.checkNotNull(changes, "Changes may not be null!"); + + for (DataTreeModification change : changes) { + final InstanceIdentifier key = change.getRootPath().getRootIdentifier(); + final DataObjectModification mod = change.getRootNode(); + final InstanceIdentifier nodeIdent = + key.firstIdentifierOf(FlowCapableNode.class); + + switch (mod.getModificationType()) { + case DELETE: + if (mod.getDataAfter() == null) { + remove(key, mod.getDataBefore(), nodeIdent); + } + break; + case SUBTREE_MODIFIED: + //NO-OP since we donot need to reconciliate on Node-updated + break; + case WRITE: + if (mod.getDataBefore() == null) { + add(key, mod.getDataAfter(), nodeIdent); + } + break; + default: + throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType()); } } - for (InstanceIdentifier entryKey : createdData) { - final InstanceIdentifier nodeIdent = entryKey - .firstIdentifierOf(FlowCapableNode.class); + } + + + + public void remove(InstanceIdentifier identifier, FlowCapableNode del, + InstanceIdentifier nodeIdent) { + if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){ + if (LOG.isDebugEnabled()) { + LOG.debug("Node removed: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue()); + } + if ( ! nodeIdent.isWildcarded()) { - flowNodeConnected(nodeIdent); + flowNodeDisconnected(nodeIdent); } + } + } - // FIXME: just a hack to cover DS/operational dirty start - // if all conventional ways failed and there is update - if (removeData.isEmpty() && createdData.isEmpty() && updateData.size() == 1) { - for (Map.Entry, DataObject> entry : updateData.entrySet()) { - // and only if this update covers top element (flow-capable-node) - if (FlowCapableNode.class.equals(entry.getKey().getTargetType())) { - final InstanceIdentifier nodeIdent = entry.getKey() - .firstIdentifierOf(FlowCapableNode.class); - if (!nodeIdent.isWildcarded()) { - // then force registration to local node cache and reconcile - flowNodeConnected(nodeIdent, true); - } - } + public void add(InstanceIdentifier identifier, FlowCapableNode add, + InstanceIdentifier nodeIdent) { + if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){ + if (LOG.isDebugEnabled()) { + LOG.debug("Node added: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue()); + } + + if ( ! nodeIdent.isWildcarded()) { + flowNodeConnected(nodeIdent); } } } @@ -197,40 +219,49 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { connectedNode.toString()); reconciliationPreProcess(connectedNode); } - reconciliation(connectedNode); + ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode); + executor.execute(reconciliationTask); } } - private void reconciliation(final InstanceIdentifier nodeIdent) { + private class ReconciliationTask implements Runnable { - String sNode = nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); - long nDpId = getDpnIdFromNodeName(sNode); - - ReadOnlyTransaction trans = provider.getReadTranaction(); - Optional flowNode = Optional.absent(); + InstanceIdentifier nodeIdentity; - AtomicInteger counter = new AtomicInteger(); - //initialize the counter - counter.set(0); - try { - flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get(); - } - catch (Exception e) { - LOG.error("Fail with read Config/DS for Node {} !", nodeIdent, e); + public ReconciliationTask(final InstanceIdentifier nodeIdent) { + nodeIdentity = nodeIdent; } - if (flowNode.isPresent()) { - /* Tables - have to be pushed before groups */ - // CHECK if while pusing the update, updateTableInput can be null to emulate a table add - List tableList = flowNode.get().getTableFeatures() != null - ? flowNode.get().getTableFeatures() : Collections. emptyList() ; - for (TableFeatures tableFeaturesItem : tableList) { - TableFeaturesKey tableKey = tableFeaturesItem.getKey(); - KeyedInstanceIdentifier tableFeaturesII - = nodeIdent.child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); - provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdent); + @Override + public void run() { + + String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); + long nDpId = getDpnIdFromNodeName(sNode); + + ReadOnlyTransaction trans = provider.getReadTranaction(); + Optional flowNode = Optional.absent(); + + AtomicInteger counter = new AtomicInteger(); + //initialize the counter + counter.set(0); + try { + flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get(); + } catch (Exception e) { + LOG.error("Fail with read Config/DS for Node {} !", nodeIdentity, e); } + if (flowNode.isPresent()) { + /* Tables - have to be pushed before groups */ + // CHECK if while pusing the update, updateTableInput can be null to emulate a table add + List tableList = flowNode.get().getTableFeatures() != null + ? flowNode.get().getTableFeatures() : Collections.emptyList(); + for (TableFeatures tableFeaturesItem : tableList) { + TableFeaturesKey tableKey = tableFeaturesItem.getKey(); + KeyedInstanceIdentifier tableFeaturesII + = nodeIdentity.child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); + provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdentity); + } + /* Groups - have to be first */ List groups = flowNode.get().getGroup() != null ? flowNode.get().getGroup() : Collections.emptyList(); @@ -241,10 +272,10 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { List suspectedGroups = new ArrayList<>(); while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) && - (counter.get()<=provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold + (counter.get() <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold - if(toBeInstalledGroups.isEmpty() && ! suspectedGroups.isEmpty()){ - LOG.error("These Groups are pointing to node-connectors that are not up yet {}",suspectedGroups.toString()); + if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) { + LOG.error("These Groups are pointing to node-connectors that are not up yet {}", suspectedGroups.toString()); toBeInstalledGroups.addAll(suspectedGroups); break; } @@ -255,27 +286,27 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { boolean okToInstall = true; for (Bucket bucket : group.getBuckets().getBucket()) { for (Action action : bucket.getAction()) { - //chained-port + //chained-port if (action.getAction().getImplementedInterface().getName() - .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")){ - String nodeConnectorUri = ((OutputActionCase)(action.getAction())) + .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")) { + String nodeConnectorUri = ((OutputActionCase) (action.getAction())) .getOutputAction().getOutputNodeConnector().getValue(); - LOG.warn("Installing the group for node connector {}",nodeConnectorUri); + LOG.warn("Installing the group for node connector {}", nodeConnectorUri); //check if the nodeconnector is there in the multimap boolean isPresent = provider.getFlowNodeConnectorInventoryTranslatorImpl() .isNodeConnectorUpdated(nDpId, nodeConnectorUri); //if yes set okToInstall = true - if(isPresent){ - break; + if (isPresent) { + break; }//else put it in a different list and still set okToInstall = true else { suspectedGroups.add(group); LOG.error("Not yet received the node-connector updated for {} " + - "for the group with id {}",nodeConnectorUri,group.getGroupId().toString()); - break; + "for the group with id {}", nodeConnectorUri, group.getGroupId().toString()); + break; } @@ -290,21 +321,20 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } } } - if (!okToInstall){ + if (!okToInstall) { //increment retry counter value counter.incrementAndGet(); break; } - } if (okToInstall) { final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); + nodeIdentity.child(Group.class, group.getKey()); + provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); alreadyInstalledGroupids.add(group.getGroupId().getValue()); iterator.remove(); // resetting the counter to zero @@ -314,48 +344,48 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } /* installation of suspected groups*/ - if(!toBeInstalledGroups.isEmpty()){ - for(Group group :toBeInstalledGroups){ - LOG.error("Installing the group {} finally although the port is not up after checking for {} times " - ,group.getGroupId().toString(),provider.getConfiguration().getReconciliationRetryCount()); - final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); + if (!toBeInstalledGroups.isEmpty()) { + for (Group group : toBeInstalledGroups) { + LOG.error("Installing the group {} finally although the port is not up after checking for {} times " + , group.getGroupId().toString(), provider.getConfiguration().getReconciliationRetryCount()); + final KeyedInstanceIdentifier groupIdent = + nodeIdentity.child(Group.class, group.getKey()); + provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); + } } - } /* Meters */ - List meters = flowNode.get().getMeter() != null - ? flowNode.get().getMeter() : Collections. emptyList(); - for (Meter meter : meters) { - final KeyedInstanceIdentifier meterIdent = - nodeIdent.child(Meter.class, meter.getKey()); - this.provider.getMeterCommiter().add(meterIdent, meter, nodeIdent); - } + List meters = flowNode.get().getMeter() != null + ? flowNode.get().getMeter() : Collections.emptyList(); + for (Meter meter : meters) { + final KeyedInstanceIdentifier meterIdent = + nodeIdentity.child(Meter.class, meter.getKey()); + provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); + } /* Flows */ - List tables = flowNode.get().getTable() != null - ? flowNode.get().getTable() : Collections.
emptyList(); - for (Table table : tables) { - final KeyedInstanceIdentifier tableIdent = - nodeIdent.child(Table.class, table.getKey()); - List flows = table.getFlow() != null ? table.getFlow() : Collections. emptyList(); - for (Flow flow : flows) { - final KeyedInstanceIdentifier flowIdent = - tableIdent.child(Flow.class, flow.getKey()); - this.provider.getFlowCommiter().add(flowIdent, flow, nodeIdent); + List
tables = flowNode.get().getTable() != null + ? flowNode.get().getTable() : Collections.
emptyList(); + for (Table table : tables) { + final KeyedInstanceIdentifier tableIdent = + nodeIdentity.child(Table.class, table.getKey()); + List flows = table.getFlow() != null ? table.getFlow() : Collections.emptyList(); + for (Flow flow : flows) { + final KeyedInstanceIdentifier flowIdent = + tableIdent.child(Flow.class, flow.getKey()); + provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity); + } } } - } /* clean transaction */ - trans.close(); + trans.close(); + } } - private long getDpnIdFromNodeName(String nodeName) { + private long getDpnIdFromNodeName(String nodeName) { String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); - return Long.parseLong(dpId); - } + return Long.parseLong(dpId); + } private void reconciliationPreProcess(final InstanceIdentifier nodeIdent) { - List> staleFlowsToBeBulkDeleted = Lists.newArrayList(); List> staleGroupsToBeBulkDeleted = Lists.newArrayList(); List> staleMetersToBeBulkDeleted = Lists.newArrayList(); @@ -534,8 +564,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } - - - + private boolean compareInstanceIdentifierTail(InstanceIdentifier identifier1, + InstanceIdentifier identifier2) { + return Iterables.getLast(identifier1.getPathArguments()).equals(Iterables.getLast(identifier2.getPathArguments())); + } }