X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrm%2Fimpl%2FFlowNodeReconciliationImpl.java;h=909f4c1423b3113cea57c6c820cccc44aff45c71;hb=eaf84d7364d73342346682d4604fe823410e977c;hp=ac4ca0b240040c992006bd60815f462f8d7e452f;hpb=4f8d6a7b05f1c2624d709489cf7cd8731250d914;p=openflowplugin.git diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java index ac4ca0b240..909f4c1423 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java @@ -8,25 +8,30 @@ package org.opendaylight.openflowplugin.applications.frm.impl; +import java.math.BigInteger; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; + +import java.util.concurrent.Callable; + +import org.opendaylight.controller.md.sal.binding.api.*; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase; +import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; @@ -51,18 +56,18 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.Callable; +import javax.annotation.Nonnull; + /** * forwardingrules-manager @@ -82,8 +87,18 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { private final DataBroker dataBroker; private final ForwardingRulesManager provider; + public static final String SEPARATOR = ":"; + + private ListenerRegistration listenerRegistration; + + private final int THREAD_POOL_SIZE = 4; + ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - private ListenerRegistration listenerRegistration; + private static final InstanceIdentifier II_TO_FLOW_CAPABLE_NODE + = InstanceIdentifier.builder(Nodes.class) + .child(Node.class) + .augmentation(FlowCapableNode.class) + .build(); public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) { this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!"); @@ -92,14 +107,17 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { final InstanceIdentifier flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class) .child(Node.class).augmentation(FlowCapableNode.class); + final DataTreeIdentifier treeId = + new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, flowNodeWildCardIdentifier); + + try { SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK, ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES); - try { - listenerRegistration = looper.loopUntilNoException(new Callable>() { + + listenerRegistration = looper.loopUntilNoException(new Callable>() { @Override - public ListenerRegistration call() throws Exception { - return db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, - flowNodeWildCardIdentifier, FlowNodeReconciliationImpl.this, DataChangeScope.BASE); + public ListenerRegistration call() throws Exception { + return dataBroker.registerDataTreeChangeListener(treeId, FlowNodeReconciliationImpl.this); } }); } catch (Exception e) { @@ -123,25 +141,58 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } @Override - public void onDataChanged(final AsyncDataChangeEvent, DataObject> changeEvent) { - Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!"); - /* All DataObjects for create */ - final Set> createdData = changeEvent.getCreatedData() != null - ? changeEvent.getCreatedData().keySet() : Collections.> emptySet(); - /* All DataObjects for remove */ - final Set> removeData = changeEvent.getRemovedPaths() != null - ? changeEvent.getRemovedPaths() : Collections.> emptySet(); - - for (InstanceIdentifier entryKey : removeData) { - final InstanceIdentifier nodeIdent = entryKey - .firstIdentifierOf(FlowCapableNode.class); + public void onDataTreeChanged(@Nonnull Collection> changes) { + Preconditions.checkNotNull(changes, "Changes may not be null!"); + + for (DataTreeModification change : changes) { + final InstanceIdentifier key = change.getRootPath().getRootIdentifier(); + final DataObjectModification mod = change.getRootNode(); + final InstanceIdentifier nodeIdent = + key.firstIdentifierOf(FlowCapableNode.class); + + switch (mod.getModificationType()) { + case DELETE: + if (mod.getDataAfter() == null) { + remove(key, mod.getDataBefore(), nodeIdent); + } + break; + case SUBTREE_MODIFIED: + //NO-OP since we donot need to reconciliate on Node-updated + break; + case WRITE: + if (mod.getDataBefore() == null) { + add(key, mod.getDataAfter(), nodeIdent); + } + break; + default: + throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType()); + } + } + } + + + + public void remove(InstanceIdentifier identifier, FlowCapableNode del, + InstanceIdentifier nodeIdent) { + if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){ + if (LOG.isDebugEnabled()) { + LOG.debug("Node removed: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue()); + } + if ( ! nodeIdent.isWildcarded()) { flowNodeDisconnected(nodeIdent); } + } - for (InstanceIdentifier entryKey : createdData) { - final InstanceIdentifier nodeIdent = entryKey - .firstIdentifierOf(FlowCapableNode.class); + } + + public void add(InstanceIdentifier identifier, FlowCapableNode add, + InstanceIdentifier nodeIdent) { + if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){ + if (LOG.isDebugEnabled()) { + LOG.debug("Node added: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue()); + } + if ( ! nodeIdent.isWildcarded()) { flowNodeConnected(nodeIdent); } @@ -155,7 +206,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { @Override public void flowNodeConnected(InstanceIdentifier connectedNode) { - if ( ! provider.isNodeActive(connectedNode)) { + flowNodeConnected(connectedNode, false); + } + + private void flowNodeConnected(InstanceIdentifier connectedNode, boolean force) { + if (force || !provider.isNodeActive(connectedNode)) { provider.registrateNewNode(connectedNode); if(!provider.isNodeOwner(connectedNode)) { return; } @@ -165,38 +220,48 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { connectedNode.toString()); reconciliationPreProcess(connectedNode); } - reconciliation(connectedNode); + ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode); + executor.execute(reconciliationTask); } } - private void reconciliation(final InstanceIdentifier nodeIdent) { + private class ReconciliationTask implements Runnable { - ReadOnlyTransaction trans = provider.getReadTranaction(); - Optional flowNode = Optional.absent(); + InstanceIdentifier nodeIdentity; - try { - flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get(); - } - catch (Exception e) { - LOG.error("Fail with read Config/DS for Node {} !", nodeIdent, e); + public ReconciliationTask(final InstanceIdentifier nodeIdent) { + nodeIdentity = nodeIdent; } - if (flowNode.isPresent()) { + @Override + public void run() { + + String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); + BigInteger nDpId = getDpnIdFromNodeName(sNode); + + ReadOnlyTransaction trans = provider.getReadTranaction(); + Optional flowNode = Optional.absent(); + + AtomicInteger counter = new AtomicInteger(); + //initialize the counter + counter.set(0); + try { + flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get(); + } catch (Exception e) { + LOG.error("Fail with read Config/DS for Node {} !", nodeIdentity, e); + } + + if (flowNode.isPresent()) { /* Tables - have to be pushed before groups */ - // CHECK if while pusing the update, updateTableInput can be null to emulate a table add - List tableList = flowNode.get().getTable() != null - ? flowNode.get().getTable() : Collections.
emptyList() ; - for (Table table : tableList) { - TableKey tableKey = table.getKey(); - KeyedInstanceIdentifier tableFeaturesII - = nodeIdent.child(Table.class, tableKey).child(TableFeatures.class, new TableFeaturesKey(tableKey.getId())); - List tableFeatures = table.getTableFeatures(); - if (tableFeatures != null) { - for (TableFeatures tableFeaturesItem : tableFeatures) { - provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdent); - } + // CHECK if while pusing the update, updateTableInput can be null to emulate a table add + List tableList = flowNode.get().getTableFeatures() != null + ? flowNode.get().getTableFeatures() : Collections.emptyList(); + for (TableFeatures tableFeaturesItem : tableList) { + TableFeaturesKey tableKey = tableFeaturesItem.getKey(); + KeyedInstanceIdentifier tableFeaturesII + = nodeIdentity.child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); + provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdentity); } - } /* Groups - have to be first */ List groups = flowNode.get().getGroup() != null @@ -204,15 +269,51 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { List toBeInstalledGroups = new ArrayList<>(); toBeInstalledGroups.addAll(groups); List alreadyInstalledGroupids = new ArrayList<>(); + //new list for suspected groups pointing to ports .. when the ports come up late + List suspectedGroups = new ArrayList<>(); + + while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) && + (counter.get() <= provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold + + if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) { + LOG.error("These Groups are pointing to node-connectors that are not up yet {}", suspectedGroups.toString()); + toBeInstalledGroups.addAll(suspectedGroups); + break; + } - while (!toBeInstalledGroups.isEmpty()) { ListIterator iterator = toBeInstalledGroups.listIterator(); while (iterator.hasNext()) { Group group = iterator.next(); boolean okToInstall = true; for (Bucket bucket : group.getBuckets().getBucket()) { for (Action action : bucket.getAction()) { + //chained-port if (action.getAction().getImplementedInterface().getName() + .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")) { + String nodeConnectorUri = ((OutputActionCase) (action.getAction())) + .getOutputAction().getOutputNodeConnector().getValue(); + + LOG.warn("Installing the group for node connector {}", nodeConnectorUri); + + //check if the nodeconnector is there in the multimap + boolean isPresent = provider.getFlowNodeConnectorInventoryTranslatorImpl() + .isNodeConnectorUpdated(nDpId, nodeConnectorUri); + //if yes set okToInstall = true + + if (isPresent) { + break; + }//else put it in a different list and still set okToInstall = true + else { + suspectedGroups.add(group); + LOG.error("Not yet received the node-connector updated for {} " + + "for the group with id {}", nodeConnectorUri, group.getGroupId().toString()); + break; + } + + + } + //chained groups + else if (action.getAction().getImplementedInterface().getName() .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase")) { Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId(); if (!alreadyInstalledGroupids.contains(groupId)) { @@ -221,52 +322,71 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } } } - if (!okToInstall){ + if (!okToInstall) { + //increment retry counter value + counter.incrementAndGet(); break; } + + } if (okToInstall) { final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); + nodeIdentity.child(Group.class, group.getKey()); + provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); alreadyInstalledGroupids.add(group.getGroupId().getValue()); iterator.remove(); + // resetting the counter to zero + counter.set(0); } } } + + /* installation of suspected groups*/ + if (!toBeInstalledGroups.isEmpty()) { + for (Group group : toBeInstalledGroups) { + LOG.error("Installing the group {} finally although the port is not up after checking for {} times " + , group.getGroupId().toString(), provider.getConfiguration().getReconciliationRetryCount()); + final KeyedInstanceIdentifier groupIdent = + nodeIdentity.child(Group.class, group.getKey()); + provider.getGroupCommiter().add(groupIdent, group, nodeIdentity); + } + } /* Meters */ - List meters = flowNode.get().getMeter() != null - ? flowNode.get().getMeter() : Collections. emptyList(); - for (Meter meter : meters) { - final KeyedInstanceIdentifier meterIdent = - nodeIdent.child(Meter.class, meter.getKey()); - this.provider.getMeterCommiter().add(meterIdent, meter, nodeIdent); - } + List meters = flowNode.get().getMeter() != null + ? flowNode.get().getMeter() : Collections.emptyList(); + for (Meter meter : meters) { + final KeyedInstanceIdentifier meterIdent = + nodeIdentity.child(Meter.class, meter.getKey()); + provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); + } /* Flows */ - List
tables = flowNode.get().getTable() != null - ? flowNode.get().getTable() : Collections.
emptyList(); - for (Table table : tables) { - final KeyedInstanceIdentifier tableIdent = - nodeIdent.child(Table.class, table.getKey()); - List flows = table.getFlow() != null ? table.getFlow() : Collections. emptyList(); - for (Flow flow : flows) { - final KeyedInstanceIdentifier flowIdent = - tableIdent.child(Flow.class, flow.getKey()); - this.provider.getFlowCommiter().add(flowIdent, flow, nodeIdent); + List
tables = flowNode.get().getTable() != null + ? flowNode.get().getTable() : Collections.
emptyList(); + for (Table table : tables) { + final KeyedInstanceIdentifier tableIdent = + nodeIdentity.child(Table.class, table.getKey()); + List flows = table.getFlow() != null ? table.getFlow() : Collections.emptyList(); + for (Flow flow : flows) { + final KeyedInstanceIdentifier flowIdent = + tableIdent.child(Flow.class, flow.getKey()); + provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity); + } } } - } /* clean transaction */ - trans.close(); + trans.close(); + } + } + private BigInteger getDpnIdFromNodeName(String nodeName) { + String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); + return new BigInteger(dpId); } - - private void reconciliationPreProcess(final InstanceIdentifier nodeIdent) { - List> staleFlowsToBeBulkDeleted = Lists.newArrayList(); List> staleGroupsToBeBulkDeleted = Lists.newArrayList(); List> staleMetersToBeBulkDeleted = Lists.newArrayList(); @@ -326,7 +446,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { final KeyedInstanceIdentifier groupIdent = nodeIdent.child(Group.class, toBeDeletedGroup.getKey()); - this.provider.getGroupCommiter().add(groupIdent, toBeDeletedGroup, nodeIdent); + this.provider.getGroupCommiter().remove(groupIdent, toBeDeletedGroup, nodeIdent); staleGroupsToBeBulkDeleted.add(getStaleGroupInstanceIdentifier(staleGroup, nodeIdent)); } @@ -346,7 +466,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { nodeIdent.child(Meter.class, toBeDeletedMeter.getKey()); - this.provider.getMeterCommiter().add(meterIdent, toBeDeletedMeter, nodeIdent); + this.provider.getMeterCommiter().remove(meterIdent, toBeDeletedMeter, nodeIdent); staleMetersToBeBulkDeleted.add(getStaleMeterInstanceIdentifier(staleMeter, nodeIdent)); } @@ -445,8 +565,9 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } - - - + private boolean compareInstanceIdentifierTail(InstanceIdentifier identifier1, + InstanceIdentifier identifier2) { + return Iterables.getLast(identifier1.getPathArguments()).equals(Iterables.getLast(identifier2.getPathArguments())); + } }