X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrm%2Fimpl%2FFlowNodeReconciliationImpl.java;h=9366b2c58e7a0d105aca5a986447636d5a3b08f8;hb=cfe3a97837951ebbedb337dc988027f10c49f714;hp=e372ff557dfbc88b11c8da1e802cead2b75b5ec5;hpb=82cd91207e0dfbea3bba59537e7ff9dcaf75d895;p=openflowplugin.git diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java index e372ff557d..9366b2c58e 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/FlowNodeReconciliationImpl.java @@ -1,39 +1,48 @@ -/** - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. +/* + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.applications.frm.impl; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; - +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; - -import org.opendaylight.controller.md.sal.binding.api.*; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.ReadTransaction; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.openflowplugin.api.OFConstants; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; -import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase; import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder; @@ -48,226 +57,300 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - - /** - * forwardingrules-manager - * org.opendaylight.openflowplugin.applications.frm - * - * FlowNode Reconciliation Listener - * Reconciliation for a new FlowNode + * Default implementation of {@link ForwardingRulesManager}. * * @author Vaclav Demcak - * - * Created: Jun 13, 2014 */ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class); - private final DataBroker dataBroker; + // The number of nanoseconds to wait for a single group to be added. + private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3); + // The maximum number of nanoseconds to wait for completion of add-group RPCs. + private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20); + private static final String SEPARATOR = ":"; + private static final int THREAD_POOL_SIZE = 4; + + private final DataBroker dataBroker; private final ForwardingRulesManager provider; - public static final String SEPARATOR = ":"; + private final String serviceName; + private final int priority; + private final ResultState resultState; + private final Map> futureMap = new HashMap<>(); - private ListenerRegistration listenerRegistration; + private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE); - private static final InstanceIdentifier II_TO_FLOW_CAPABLE_NODE - = InstanceIdentifier.builder(Nodes.class) - .child(Node.class) - .augmentation(FlowCapableNode.class) - .build(); + private final SalBundleService salBundleService; - public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) { + private static final AtomicLong BUNDLE_ID = new AtomicLong(); + private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true); + + public FlowNodeReconciliationImpl(final ForwardingRulesManager manager, final DataBroker db, + final String serviceName, final int priority, final ResultState resultState) { this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!"); dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!"); - /* Build Path */ - final InstanceIdentifier flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class) - .child(Node.class).augmentation(FlowCapableNode.class); - - final DataTreeIdentifier treeId = - new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, flowNodeWildCardIdentifier); - - try { - SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK, - ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES); - - listenerRegistration = looper.loopUntilNoException(new Callable>() { - @Override - public ListenerRegistration call() throws Exception { - return dataBroker.registerDataTreeChangeListener(treeId, FlowNodeReconciliationImpl.this); - } - }); - } catch (Exception e) { - LOG.warn("data listener registration failed: {}", e.getMessage()); - LOG.debug("data listener registration failed.. ", e); - throw new IllegalStateException("FlowNodeReconciliation startup fail! System needs restart.", e); - } + this.serviceName = serviceName; + this.priority = priority; + this.resultState = resultState; + salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(), + "salBundleService can not be null!"); } @Override public void close() { - if (listenerRegistration != null) { - try { - listenerRegistration.close(); - } catch (Exception e) { - LOG.warn("Error by stop FRM FlowNodeReconilListener: {}", e.getMessage()); - LOG.debug("Error by stop FRM FlowNodeReconilListener..", e); - } - listenerRegistration = null; + if (executor != null) { + executor.shutdownNow(); } } @Override - public void onDataTreeChanged(@Nonnull Collection> changes) { - Preconditions.checkNotNull(changes, "Changes may not be null!"); - - for (DataTreeModification change : changes) { - final InstanceIdentifier key = change.getRootPath().getRootIdentifier(); - final DataObjectModification mod = change.getRootNode(); - final InstanceIdentifier nodeIdent = - key.firstIdentifierOf(FlowCapableNode.class); - - switch (mod.getModificationType()) { - case DELETE: - if (mod.getDataAfter() == null) { - remove(key, mod.getDataBefore(), nodeIdent); - } - break; - case SUBTREE_MODIFIED: - //NO-OP since we donot need to reconciliate on Node-updated - break; - case WRITE: - if (mod.getDataBefore() == null) { - add(key, mod.getDataAfter(), nodeIdent); - } - break; - default: - throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType()); - } + public ListenableFuture reconcileConfiguration(InstanceIdentifier connectedNode) { + LOG.info("Triggering reconciliation for device {}", connectedNode.firstKeyOf(Node.class)); + if (provider.isStaleMarkingEnabled()) { + LOG.info("Stale-Marking is ENABLED and proceeding with deletion of " + "stale-marked entities on switch {}", + connectedNode.toString()); + reconciliationPreProcess(connectedNode); + } + if (provider.isBundleBasedReconciliationEnabled()) { + BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode); + return JdkFutureAdapters.listenInPoolThread(executor.submit(bundleBasedReconTask)); + } else { + ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode); + return JdkFutureAdapters.listenInPoolThread(executor.submit(reconciliationTask)); } } + private class BundleBasedReconciliationTask implements Callable { + final InstanceIdentifier nodeIdentity; - - public void remove(InstanceIdentifier identifier, FlowCapableNode del, - InstanceIdentifier nodeIdent) { - if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){ - if (LOG.isDebugEnabled()) { - LOG.debug("Node removed: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue()); - } - - if ( ! nodeIdent.isWildcarded()) { - flowNodeDisconnected(nodeIdent); - } - + BundleBasedReconciliationTask(final InstanceIdentifier nodeIdent) { + nodeIdentity = nodeIdent; } - } - public void add(InstanceIdentifier identifier, FlowCapableNode add, - InstanceIdentifier nodeIdent) { - if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){ - if (LOG.isDebugEnabled()) { - LOG.debug("Node added: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue()); + @Override + public Boolean call() { + String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue(); + Optional flowNode = Optional.empty(); + BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement()); + BigInteger dpnId = getDpnIdFromNodeName(node); + LOG.info("Triggering bundle based reconciliation for device : {}", dpnId); + try (ReadTransaction trans = provider.getReadTransaction()) { + flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get(); + } catch (ExecutionException | InterruptedException e) { + LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e); } - if ( ! nodeIdent.isWildcarded()) { - flowNodeConnected(nodeIdent); + if (flowNode.isPresent()) { + LOG.debug("FlowNode present for Datapath ID {}", dpnId); + final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class)); + + final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) + .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTCLOSEREQUEST).build(); + + final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) + .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS).setType(BundleControlType.ONFBCTOPENREQUEST) + .build(); + + final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef) + .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) + .setType(BundleControlType.ONFBCTCOMMITREQUEST).build(); + + final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder() + .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS) + .setMessages(createMessages(nodeRef, flowNode)).build(); + + /* Close previously opened bundle on the openflow switch if any */ + ListenableFuture> closeBundle + = salBundleService.controlBundle(closeBundleInput); + + /* Open a new bundle on the switch */ + ListenableFuture> openBundle = + Futures.transformAsync(closeBundle, + rpcResult -> salBundleService.controlBundle(openBundleInput), + MoreExecutors.directExecutor()); + + /* Push groups and flows via bundle add messages */ + ListenableFuture> addBundleMessagesFuture + = Futures.transformAsync(openBundle, rpcResult -> { + if (rpcResult.isSuccessful()) { + return salBundleService.addBundleMessages(addBundleMessagesInput); + } + return Futures.immediateFuture(null); + }, MoreExecutors.directExecutor()); + + /* Commit the bundle on the openflow switch */ + ListenableFuture> commitBundleFuture + = Futures.transformAsync(addBundleMessagesFuture, rpcResult -> { + if (rpcResult.isSuccessful()) { + return salBundleService.controlBundle(commitBundleInput); + } + return Futures.immediateFuture(null); + }, MoreExecutors.directExecutor()); + + /* Bundles not supported for meters */ + List meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter() + : Collections.emptyList(); + Futures.transformAsync(commitBundleFuture, + rpcResult -> { + if (rpcResult.isSuccessful()) { + for (Meter meter : meters) { + final KeyedInstanceIdentifier meterIdent = nodeIdentity + .child(Meter.class, meter.key()); + provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); + } + } + return Futures.immediateFuture(null); + }, MoreExecutors.directExecutor()); + + try { + if (commitBundleFuture.get().isSuccessful()) { + LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId); + return true; + } else { + return false; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity); + return false; + } } + LOG.error("FlowNode not present for Datapath ID {}", dpnId); + return false; } } @Override - public void flowNodeDisconnected(InstanceIdentifier disconnectedNode) { - provider.unregistrateNode(disconnectedNode); + public ListenableFuture startReconciliation(DeviceInfo node) { + InstanceIdentifier connectedNode = node.getNodeInstanceIdentifier() + .augmentation(FlowCapableNode.class); + // Clearing the group registry cache for the connected node if exists + provider.getDevicesGroupRegistry().clearNodeGroups(node.getNodeId()); + return futureMap.computeIfAbsent(node, future -> reconcileConfiguration(connectedNode)); } @Override - public void flowNodeConnected(InstanceIdentifier connectedNode) { - flowNodeConnected(connectedNode, false); + public ListenableFuture endReconciliation(DeviceInfo node) { + futureMap.computeIfPresent(node, (key, future) -> future).cancel(true); + futureMap.remove(node); + return Futures.immediateFuture(true); } - private void flowNodeConnected(InstanceIdentifier connectedNode, boolean force) { - if (force || !provider.isNodeActive(connectedNode)) { - provider.registrateNewNode(connectedNode); - - if(!provider.isNodeOwner(connectedNode)) { return; } + @Override + public int getPriority() { + return priority; + } - if (provider.getConfiguration().isStaleMarkingEnabled()) { - LOG.info("Stale-Marking is ENABLED and proceeding with deletion of stale-marked entities on switch {}", - connectedNode.toString()); - reconciliationPreProcess(connectedNode); - } - reconciliation(connectedNode); - } + @Override + public String getName() { + return serviceName; } - private void reconciliation(final InstanceIdentifier nodeIdent) { + @Override + public ResultState getResultState() { + return resultState; + } - String sNode = nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId().getValue(); - long nDpId = getDpnIdFromNodeName(sNode); + private class ReconciliationTask implements Callable { - ReadOnlyTransaction trans = provider.getReadTranaction(); - Optional flowNode = Optional.absent(); + InstanceIdentifier nodeIdentity; - AtomicInteger counter = new AtomicInteger(); - //initialize the counter - counter.set(0); - try { - flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get(); - } - catch (Exception e) { - LOG.error("Fail with read Config/DS for Node {} !", nodeIdent, e); + ReconciliationTask(final InstanceIdentifier nodeIdent) { + nodeIdentity = nodeIdent; } - if (flowNode.isPresent()) { - /* Tables - have to be pushed before groups */ - // CHECK if while pusing the update, updateTableInput can be null to emulate a table add - List tableList = flowNode.get().getTableFeatures() != null - ? flowNode.get().getTableFeatures() : Collections. emptyList() ; - for (TableFeatures tableFeaturesItem : tableList) { - TableFeaturesKey tableKey = tableFeaturesItem.getKey(); - KeyedInstanceIdentifier tableFeaturesII - = nodeIdent.child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); - provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdent); + @Override + public Boolean call() { + String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue(); + BigInteger dpnId = getDpnIdFromNodeName(node); + + Optional flowNode; + // initialize the counter + int counter = 0; + try (ReadTransaction trans = provider.getReadTransaction()) { + flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get(); + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Fail with read Config/DS for Node {} !", nodeIdentity, e); + return false; } - /* Groups - have to be first */ - List groups = flowNode.get().getGroup() != null - ? flowNode.get().getGroup() : Collections.emptyList(); + if (flowNode.isPresent()) { + /* Tables - have to be pushed before groups */ + // CHECK if while pushing the update, updateTableInput can be null to emulate a + // table add + List tableList = flowNode.get().getTableFeatures() != null + ? flowNode.get().getTableFeatures() + : Collections.emptyList(); + for (TableFeatures tableFeaturesItem : tableList) { + TableFeaturesKey tableKey = tableFeaturesItem.key(); + KeyedInstanceIdentifier tableFeaturesII = nodeIdentity + .child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId())); + provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdentity); + } + + /* Groups - have to be first */ + List groups = flowNode.get().getGroup() != null ? flowNode.get().getGroup() + : Collections.emptyList(); List toBeInstalledGroups = new ArrayList<>(); toBeInstalledGroups.addAll(groups); - List alreadyInstalledGroupids = new ArrayList<>(); - //new list for suspected groups pointing to ports .. when the ports come up late + // new list for suspected groups pointing to ports .. when the ports come up + // late List suspectedGroups = new ArrayList<>(); + Map> groupFutures = new HashMap<>(); - while ((!(toBeInstalledGroups.isEmpty()) || !(suspectedGroups.isEmpty())) && - (counter.get()<=provider.getConfiguration().getReconciliationRetryCount())) { //also check if the counter has not crossed the threshold + while ((!toBeInstalledGroups.isEmpty() || !suspectedGroups.isEmpty()) + && counter <= provider.getReconciliationRetryCount()) { // also check if the counter has not + // crossed the threshold - if(toBeInstalledGroups.isEmpty() && ! suspectedGroups.isEmpty()){ - LOG.error("These Groups are pointing to node-connectors that are not up yet {}",suspectedGroups.toString()); + if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) { + LOG.debug("These Groups are pointing to node-connectors that are not up yet {}", + suspectedGroups.toString()); toBeInstalledGroups.addAll(suspectedGroups); break; } @@ -276,122 +359,197 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { while (iterator.hasNext()) { Group group = iterator.next(); boolean okToInstall = true; - for (Bucket bucket : group.getBuckets().getBucket()) { - for (Action action : bucket.getAction()) { - //chained-port + Buckets buckets = group.getBuckets(); + List bucketList = buckets == null ? null : buckets.getBucket(); + if (bucketList == null) { + bucketList = Collections.emptyList(); + } + for (Bucket bucket : bucketList) { + List actions = bucket.getAction(); + if (actions == null) { + actions = Collections.emptyList(); + } + for (Action action : actions) { + // chained-port if (action.getAction().getImplementedInterface().getName() - .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase")){ - String nodeConnectorUri = ((OutputActionCase)(action.getAction())) - .getOutputAction().getOutputNodeConnector().getValue(); + .equals("org.opendaylight.yang.gen.v1.urn.opendaylight" + + ".action.types.rev131112.action.action.OutputActionCase")) { + String nodeConnectorUri = ((OutputActionCase) action.getAction()).getOutputAction() + .getOutputNodeConnector().getValue(); - LOG.warn("Installing the group for node connector {}",nodeConnectorUri); + LOG.debug("Installing the group for node connector {}", nodeConnectorUri); - //check if the nodeconnector is there in the multimap + // check if the nodeconnector is there in the multimap boolean isPresent = provider.getFlowNodeConnectorInventoryTranslatorImpl() - .isNodeConnectorUpdated(nDpId, nodeConnectorUri); - //if yes set okToInstall = true + .isNodeConnectorUpdated(dpnId, nodeConnectorUri); + // if yes set okToInstall = true - if(isPresent){ - break; - }//else put it in a different list and still set okToInstall = true - else { + if (isPresent) { + break; + } else { + // else put it in a different list and still set okToInstall = true suspectedGroups.add(group); - LOG.error("Not yet received the node-connector updated for {} " + - "for the group with id {}",nodeConnectorUri,group.getGroupId().toString()); - break; + LOG.debug( + "Not yet received the node-connector updated for {} " + + "for the group with id {}", + nodeConnectorUri, group.getGroupId().toString()); + break; } - - - } - //chained groups - else if (action.getAction().getImplementedInterface().getName() - .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase")) { - Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId(); - if (!alreadyInstalledGroupids.contains(groupId)) { + } else if (action.getAction().getImplementedInterface().getName() + .equals("org.opendaylight.yang.gen.v1.urn.opendaylight" + + ".action.types.rev131112.action.action.GroupActionCase")) { + // chained groups + Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId(); + ListenableFuture future = groupFutures.get(groupId); + if (future == null) { okToInstall = false; break; } + // Need to ensure that the group specified + // by group-action is already installed. + awaitGroup(node, future); } } - if (!okToInstall){ - //increment retry counter value - counter.incrementAndGet(); + if (!okToInstall) { + // increment retry counter value + counter++; break; } - - - } - - if (okToInstall) { - final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); - alreadyInstalledGroupids.add(group.getGroupId().getValue()); + addGroup(groupFutures, group); iterator.remove(); // resetting the counter to zero - counter.set(0); + counter = 0; } } } - /* installation of suspected groups*/ - if(!toBeInstalledGroups.isEmpty()){ - for(Group group :toBeInstalledGroups){ - LOG.error("Installing the group {} finally although the port is not up after checking for {} times " - ,group.getGroupId().toString(),provider.getConfiguration().getReconciliationRetryCount()); - final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, group.getKey()); - this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent); + /* installation of suspected groups */ + if (!toBeInstalledGroups.isEmpty()) { + for (Group group : toBeInstalledGroups) { + LOG.debug( + "Installing the group {} finally although " + + "the port is not up after checking for {} times ", + group.getGroupId().toString(), provider.getReconciliationRetryCount()); + addGroup(groupFutures, group); + } + } + /* Meters */ + List meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter() + : Collections.emptyList(); + for (Meter meter : meters) { + final KeyedInstanceIdentifier meterIdent = nodeIdentity.child(Meter.class, + meter.key()); + provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity); + } + + // Need to wait for all groups to be installed before adding + // flows. + awaitGroups(node, groupFutures.values()); + + /* Flows */ + List tables = flowNode.get().getTable() != null ? flowNode.get().getTable() + : Collections.
emptyList(); + for (Table table : tables) { + final KeyedInstanceIdentifier tableIdent = nodeIdentity.child(Table.class, + table.key()); + List flows = table.getFlow() != null ? table.getFlow() : Collections.emptyList(); + for (Flow flow : flows) { + final KeyedInstanceIdentifier flowIdent = tableIdent.child(Flow.class, + flow.key()); + provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity); + } } } - /* Meters */ - List meters = flowNode.get().getMeter() != null - ? flowNode.get().getMeter() : Collections. emptyList(); - for (Meter meter : meters) { - final KeyedInstanceIdentifier meterIdent = - nodeIdent.child(Meter.class, meter.getKey()); - this.provider.getMeterCommiter().add(meterIdent, meter, nodeIdent); - } - /* Flows */ - List
tables = flowNode.get().getTable() != null - ? flowNode.get().getTable() : Collections.
emptyList(); - for (Table table : tables) { - final KeyedInstanceIdentifier tableIdent = - nodeIdent.child(Table.class, table.getKey()); - List flows = table.getFlow() != null ? table.getFlow() : Collections. emptyList(); - for (Flow flow : flows) { - final KeyedInstanceIdentifier flowIdent = - tableIdent.child(Flow.class, flow.getKey()); - this.provider.getFlowCommiter().add(flowIdent, flow, nodeIdent); + return true; + } + + /** + * Invoke add-group RPC, and put listenable future associated with the RPC into + * the given map. + * + * @param map + * The map to store listenable futures associated with add-group RPC. + * @param group + * The group to add. + */ + private void addGroup(Map> map, Group group) { + KeyedInstanceIdentifier groupIdent = nodeIdentity.child(Group.class, group.key()); + final Long groupId = group.getGroupId().getValue(); + ListenableFuture future = JdkFutureAdapters + .listenInPoolThread(provider.getGroupCommiter().add(groupIdent, group, nodeIdentity)); + + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Object result) { + if (LOG.isTraceEnabled()) { + LOG.trace("add-group RPC completed: node={}, id={}", + nodeIdentity.firstKeyOf(Node.class).getId().getValue(), groupId); + } + } + + @Override + public void onFailure(Throwable cause) { + LOG.debug("add-group RPC failed: node={}, id={}", + nodeIdentity.firstKeyOf(Node.class).getId().getValue(), groupId, cause); + } + }, MoreExecutors.directExecutor()); + + map.put(groupId, future); + } + + /** + * Wait for completion of add-group RPC. + * + * @param nodeId + * The identifier for the target node. + * @param future + * Future associated with add-group RPC that installs the target + * group. + */ + private void awaitGroup(String nodeId, ListenableFuture future) { + awaitGroups(nodeId, Collections.singleton(future)); + } + + /** + * Wait for completion of add-group RPCs. + * + * @param nodeId + * The identifier for the target node. + * @param futures + * A collection of futures associated with add-group RPCs. + */ + private void awaitGroups(String nodeId, Collection> futures) { + if (!futures.isEmpty()) { + long timeout = Math.min(ADD_GROUP_TIMEOUT * futures.size(), MAX_ADD_GROUP_TIMEOUT); + try { + Futures.successfulAsList(futures).get(timeout, TimeUnit.NANOSECONDS); + LOG.trace("awaitGroups() completed: node={}", nodeId); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + LOG.debug("add-group RPCs did not complete: node={}", nodeId); } } } - /* clean transaction */ - trans.close(); } - private long getDpnIdFromNodeName(String nodeName) { - String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); - return Long.parseLong(dpId); - } - private void reconciliationPreProcess(final InstanceIdentifier nodeIdent) { + private BigInteger getDpnIdFromNodeName(String nodeName) { + String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); + return new BigInteger(dpId); + } + private void reconciliationPreProcess(final InstanceIdentifier nodeIdent) { List> staleFlowsToBeBulkDeleted = Lists.newArrayList(); List> staleGroupsToBeBulkDeleted = Lists.newArrayList(); List> staleMetersToBeBulkDeleted = Lists.newArrayList(); + Optional flowNode = Optional.empty(); - ReadOnlyTransaction trans = provider.getReadTranaction(); - Optional flowNode = Optional.absent(); - - try { + try (ReadTransaction trans = provider.getReadTransaction()) { flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get(); - } - catch (Exception e) { - LOG.error("Reconciliation Pre-Processing Fail with read Config/DS for Node {} !", nodeIdent, e); + } catch (ExecutionException | InterruptedException e) { + LOG.warn("Reconciliation Pre-Processing Fail with read Config/DS for Node {} !", nodeIdent, e); } if (flowNode.isPresent()) { @@ -399,20 +557,20 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { LOG.debug("Proceeding with deletion of stale-marked Flows on switch {} using Openflow interface", nodeIdent.toString()); /* Stale-Flows - Stale-marked Flows have to be removed first for safety */ - List
tables = flowNode.get().getTable() != null - ? flowNode.get().getTable() : Collections.
emptyList(); + List
tables = flowNode.get().getTable() != null ? flowNode.get().getTable() + : Collections.
emptyList(); for (Table table : tables) { - final KeyedInstanceIdentifier tableIdent = - nodeIdent.child(Table.class, table.getKey()); - List staleFlows = table.getStaleFlow() != null ? table.getStaleFlow() : Collections. emptyList(); + final KeyedInstanceIdentifier tableIdent = nodeIdent.child(Table.class, + table.key()); + List staleFlows = table.getStaleFlow() != null ? table.getStaleFlow() + : Collections.emptyList(); for (StaleFlow staleFlow : staleFlows) { FlowBuilder flowBuilder = new FlowBuilder(staleFlow); Flow toBeDeletedFlow = flowBuilder.setId(staleFlow.getId()).build(); - final KeyedInstanceIdentifier flowIdent = - tableIdent.child(Flow.class, toBeDeletedFlow.getKey()); - + final KeyedInstanceIdentifier flowIdent = tableIdent.child(Flow.class, + toBeDeletedFlow.key()); this.provider.getFlowCommiter().remove(flowIdent, toBeDeletedFlow, nodeIdent); @@ -420,23 +578,23 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } } - LOG.debug("Proceeding with deletion of stale-marked Groups for switch {} using Openflow interface", nodeIdent.toString()); - // TODO: Should we collate the futures of RPC-calls to be sure that groups are Flows are fully deleted + // TODO: Should we collate the futures of RPC-calls to be sure that groups are + // Flows are fully deleted // before attempting to delete groups - just in case there are references /* Stale-marked Groups - Can be deleted after flows */ - List staleGroups = flowNode.get().getStaleGroup() != null - ? flowNode.get().getStaleGroup() : Collections. emptyList(); + List staleGroups = flowNode.get().getStaleGroup() != null ? flowNode.get().getStaleGroup() + : Collections.emptyList(); for (StaleGroup staleGroup : staleGroups) { GroupBuilder groupBuilder = new GroupBuilder(staleGroup); Group toBeDeletedGroup = groupBuilder.setGroupId(staleGroup.getGroupId()).build(); - final KeyedInstanceIdentifier groupIdent = - nodeIdent.child(Group.class, toBeDeletedGroup.getKey()); + final KeyedInstanceIdentifier groupIdent = nodeIdent.child(Group.class, + toBeDeletedGroup.key()); this.provider.getGroupCommiter().remove(groupIdent, toBeDeletedGroup, nodeIdent); @@ -446,17 +604,16 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { LOG.debug("Proceeding with deletion of stale-marked Meters for switch {} using Openflow interface", nodeIdent.toString()); /* Stale-marked Meters - can be deleted anytime - so least priority */ - List staleMeters = flowNode.get().getStaleMeter() != null - ? flowNode.get().getStaleMeter() : Collections. emptyList(); + List staleMeters = flowNode.get().getStaleMeter() != null ? flowNode.get().getStaleMeter() + : Collections.emptyList(); for (StaleMeter staleMeter : staleMeters) { MeterBuilder meterBuilder = new MeterBuilder(staleMeter); Meter toBeDeletedMeter = meterBuilder.setMeterId(staleMeter.getMeterId()).build(); - final KeyedInstanceIdentifier meterIdent = - nodeIdent.child(Meter.class, toBeDeletedMeter.getKey()); - + final KeyedInstanceIdentifier meterIdent = nodeIdent.child(Meter.class, + toBeDeletedMeter.key()); this.provider.getMeterCommiter().remove(meterIdent, toBeDeletedMeter, nodeIdent); @@ -464,102 +621,129 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation { } } - /* clean transaction */ - trans.close(); LOG.debug("Deleting all stale-marked flows/groups/meters of for switch {} in Configuration DS", nodeIdent.toString()); - // Now, do the bulk deletions - deleteDSStaleFlows(staleFlowsToBeBulkDeleted); + // Now, do the bulk deletions + deleteDSStaleFlows(staleFlowsToBeBulkDeleted); deleteDSStaleGroups(staleGroupsToBeBulkDeleted); deleteDSStaleMeters(staleMetersToBeBulkDeleted); - } - - private void deleteDSStaleFlows(List> flowsForBulkDelete){ - ImmutableList.Builder> builder = ImmutableList.builder(); - ImmutableList> bulkDelFlows = builder.addAll(flowsForBulkDelete.iterator()).build(); - + private void deleteDSStaleFlows(List> flowsForBulkDelete) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - for (InstanceIdentifier staleFlowIId : flowsForBulkDelete){ + for (InstanceIdentifier staleFlowIId : flowsForBulkDelete) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleFlowIId); } - CheckedFuture submitFuture = writeTransaction.submit(); + FluentFuture submitFuture = writeTransaction.commit(); handleStaleEntityDeletionResultFuture(submitFuture); } - private void deleteDSStaleGroups(List> groupsForBulkDelete){ - ImmutableList.Builder> builder = ImmutableList.builder(); - ImmutableList> bulkDelGroups = builder.addAll(groupsForBulkDelete.iterator()).build(); - + private void deleteDSStaleGroups(List> groupsForBulkDelete) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - for (InstanceIdentifier staleGroupIId : groupsForBulkDelete){ + for (InstanceIdentifier staleGroupIId : groupsForBulkDelete) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleGroupIId); } - CheckedFuture submitFuture = writeTransaction.submit(); + FluentFuture submitFuture = writeTransaction.commit(); handleStaleEntityDeletionResultFuture(submitFuture); - } - private void deleteDSStaleMeters(List> metersForBulkDelete){ - ImmutableList.Builder> builder = ImmutableList.builder(); - ImmutableList> bulkDelGroups = builder.addAll(metersForBulkDelete.iterator()).build(); - + private void deleteDSStaleMeters(List> metersForBulkDelete) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); - for (InstanceIdentifier staleMeterIId : metersForBulkDelete){ + for (InstanceIdentifier staleMeterIId : metersForBulkDelete) { writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleMeterIId); } - CheckedFuture submitFuture = writeTransaction.submit(); + FluentFuture submitFuture = writeTransaction.commit(); handleStaleEntityDeletionResultFuture(submitFuture); - - } - - private InstanceIdentifier getStaleFlowInstanceIdentifier(StaleFlow staleFlow, InstanceIdentifier nodeIdent) { - return nodeIdent - .child(Table.class, new TableKey(staleFlow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class, - new StaleFlowKey(new FlowId(staleFlow.getId()))); + private InstanceIdentifier getStaleFlowInstanceIdentifier( + StaleFlow staleFlow, InstanceIdentifier nodeIdent) { + return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child( + org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class, + new StaleFlowKey(new FlowId(staleFlow.getId()))); } - private InstanceIdentifier getStaleGroupInstanceIdentifier(StaleGroup staleGroup, InstanceIdentifier nodeIdent) { - return nodeIdent - .child(StaleGroup.class, new StaleGroupKey(new GroupId(staleGroup.getGroupId()))); + private InstanceIdentifier getStaleGroupInstanceIdentifier( + StaleGroup staleGroup, InstanceIdentifier nodeIdent) { + return nodeIdent.child(StaleGroup.class, new StaleGroupKey(new GroupId(staleGroup.getGroupId()))); } - - private InstanceIdentifier getStaleMeterInstanceIdentifier(StaleMeter staleMeter, InstanceIdentifier nodeIdent) { - return nodeIdent - .child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId()))); + private InstanceIdentifier getStaleMeterInstanceIdentifier( + StaleMeter staleMeter, InstanceIdentifier nodeIdent) { + return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId()))); } - - private void handleStaleEntityDeletionResultFuture(CheckedFuture submitFuture) { - Futures.addCallback(submitFuture, new FutureCallback() { + private void handleStaleEntityDeletionResultFuture(FluentFuture submitFuture) { + submitFuture.addCallback(new FutureCallback() { @Override - public void onSuccess(Void result) { LOG.debug("Stale entity removal success"); + public void onSuccess(Object result) { + LOG.debug("Stale entity removal success"); } @Override - public void onFailure(Throwable t) { - LOG.error("Stale entity removal failed {}", t); + public void onFailure(Throwable throwable) { + LOG.debug("Stale entity removal failed {}", throwable); } - }); + }, MoreExecutors.directExecutor()); + } + private Flow getDeleteAllFlow() { + final FlowBuilder flowBuilder = new FlowBuilder(); + flowBuilder.setTableId(OFConstants.OFPTT_ALL); + return flowBuilder.build(); } + private Group getDeleteAllGroup() { + final GroupBuilder groupBuilder = new GroupBuilder(); + groupBuilder.setGroupType(GroupTypes.GroupAll); + groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL)); + return groupBuilder.build(); + } - private boolean compareInstanceIdentifierTail(InstanceIdentifier identifier1, - InstanceIdentifier identifier2) { - return Iterables.getLast(identifier1.getPathArguments()).equals(Iterables.getLast(identifier2.getPathArguments())); + private Messages createMessages(final NodeRef nodeRef, final Optional flowNode) { + final List messages = new ArrayList<>(); + messages.add(new MessageBuilder().setNode(nodeRef) + .setBundleInnerMessage(new BundleRemoveFlowCaseBuilder() + .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build()).build()) + .build()); + + messages.add(new MessageBuilder().setNode(nodeRef) + .setBundleInnerMessage(new BundleRemoveGroupCaseBuilder() + .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()).build()) + .build()); + + if (flowNode.get().getGroup() != null) { + for (Group gr : flowNode.get().getGroup()) { + NodeId nodeId = nodeRef.getValue().firstKeyOf(Node.class).getId(); + provider.getDevicesGroupRegistry().storeGroup(nodeId,gr.getGroupId().getValue()); + messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(new BundleAddGroupCaseBuilder() + .setAddGroupCaseData(new AddGroupCaseDataBuilder(gr).build()).build()).build()); + } + } + + if (flowNode.get().getTable() != null) { + for (Table table : flowNode.get().getTable()) { + for (Flow flow : table.getFlow()) { + messages.add( + new MessageBuilder().setNode(nodeRef) + .setBundleInnerMessage(new BundleAddFlowCaseBuilder() + .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build()) + .build()); + } + } + } + + LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size()); + return new MessagesBuilder().setMessage(messages).build(); } } -