Merge "Add NodeConfiguratorImpl enqueue trace"
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowNodeReconciliationImpl.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Lists;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.ListIterator;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import java.util.concurrent.atomic.AtomicLong;
34 import org.opendaylight.mdsal.binding.api.DataBroker;
35 import org.opendaylight.mdsal.binding.api.ReadTransaction;
36 import org.opendaylight.mdsal.binding.api.WriteTransaction;
37 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
38 import org.opendaylight.openflowplugin.api.OFConstants;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
41 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.StaleMeter;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.StaleMeterKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
97 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
98 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
99 import org.opendaylight.yangtools.yang.common.RpcResult;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
102
103 /**
104  * Default implementation of {@link ForwardingRulesManager}.
105  *
106  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
107  */
108 public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
109
110     private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
111
112     // The number of nanoseconds to wait for a single group to be added.
113     private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
114
115     // The maximum number of nanoseconds to wait for completion of add-group RPCs.
116     private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
117     private static final String SEPARATOR = ":";
118     private static final int THREAD_POOL_SIZE = 4;
119
120     private final DataBroker dataBroker;
121     private final ForwardingRulesManager provider;
122     private final String serviceName;
123     private final int priority;
124     private final ResultState resultState;
125     private final Map<DeviceInfo, ListenableFuture<Boolean>> futureMap = new HashMap<>();
126
127     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
128
129     private final SalBundleService salBundleService;
130
131     private static final AtomicLong BUNDLE_ID = new AtomicLong();
132     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
133
134     public FlowNodeReconciliationImpl(final ForwardingRulesManager manager, final DataBroker db,
135             final String serviceName, final int priority, final ResultState resultState) {
136         this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
137         dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
138         this.serviceName = serviceName;
139         this.priority = priority;
140         this.resultState = resultState;
141         salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(),
142                 "salBundleService can not be null!");
143     }
144
145     @Override
146     public void close() {
147         if (executor != null) {
148             executor.shutdownNow();
149         }
150     }
151
152     @Override
153     public ListenableFuture<Boolean> reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode) {
154         LOG.info("Triggering reconciliation for device {}", connectedNode.firstKeyOf(Node.class));
155         if (provider.isStaleMarkingEnabled()) {
156             LOG.info("Stale-Marking is ENABLED and proceeding with deletion of " + "stale-marked entities on switch {}",
157                     connectedNode.toString());
158             reconciliationPreProcess(connectedNode);
159         }
160         if (provider.isBundleBasedReconciliationEnabled()) {
161             BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
162             return JdkFutureAdapters.listenInPoolThread(executor.submit(bundleBasedReconTask));
163         } else {
164             ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
165             return JdkFutureAdapters.listenInPoolThread(executor.submit(reconciliationTask));
166         }
167     }
168
169     private class BundleBasedReconciliationTask implements Callable<Boolean> {
170         final InstanceIdentifier<FlowCapableNode> nodeIdentity;
171
172         BundleBasedReconciliationTask(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
173             nodeIdentity = nodeIdent;
174         }
175
176         @Override
177         public Boolean call() {
178             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
179             Optional<FlowCapableNode> flowNode = Optional.empty();
180             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
181             BigInteger dpnId = getDpnIdFromNodeName(node);
182             LOG.info("Triggering bundle based reconciliation for device : {}", dpnId);
183             try (ReadTransaction trans = provider.getReadTransaction()) {
184                 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
185             } catch (ExecutionException | InterruptedException e) {
186                 LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e);
187             }
188
189             if (flowNode.isPresent()) {
190                 LOG.debug("FlowNode present for Datapath ID {}", dpnId);
191                 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
192
193                 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
194                         .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
195                         .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
196
197                 final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
198                         .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS).setType(BundleControlType.ONFBCTOPENREQUEST)
199                         .build();
200
201                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
202                         .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
203                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
204
205                 final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
206                         .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
207                         .setMessages(createMessages(nodeRef, flowNode)).build();
208
209                 /* Close previously opened bundle on the openflow switch if any */
210                 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle
211                         = salBundleService.controlBundle(closeBundleInput);
212
213                 /* Open a new bundle on the switch */
214                 ListenableFuture<RpcResult<ControlBundleOutput>> openBundle =
215                         Futures.transformAsync(closeBundle,
216                             rpcResult -> salBundleService.controlBundle(openBundleInput),
217                             MoreExecutors.directExecutor());
218
219                 /* Push groups and flows via bundle add messages */
220                 ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture
221                         = Futures.transformAsync(openBundle, rpcResult -> {
222                             if (rpcResult.isSuccessful()) {
223                                 return salBundleService.addBundleMessages(addBundleMessagesInput);
224                             }
225                             return Futures.immediateFuture(null);
226                         }, MoreExecutors.directExecutor());
227
228                 /* Commit the bundle on the openflow switch */
229                 ListenableFuture<RpcResult<ControlBundleOutput>> commitBundleFuture
230                         = Futures.transformAsync(addBundleMessagesFuture, rpcResult -> {
231                             if (rpcResult.isSuccessful()) {
232                                 return salBundleService.controlBundle(commitBundleInput);
233                             }
234                             return Futures.immediateFuture(null);
235                         }, MoreExecutors.directExecutor());
236
237                 /* Bundles not supported for meters */
238                 List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
239                         : Collections.emptyList();
240                 Futures.transformAsync(commitBundleFuture,
241                     rpcResult -> {
242                         if (rpcResult.isSuccessful()) {
243                             for (Meter meter : meters) {
244                                 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity
245                                         .child(Meter.class, meter.key());
246                                 provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
247                             }
248                         }
249                         return Futures.immediateFuture(null);
250                     }, MoreExecutors.directExecutor());
251
252                 try {
253                     if (commitBundleFuture.get().isSuccessful()) {
254                         LOG.debug("Completing bundle based reconciliation for device ID:{}", dpnId);
255                         return true;
256                     } else {
257                         return false;
258                     }
259                 } catch (InterruptedException | ExecutionException e) {
260                     LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity);
261                     return false;
262                 }
263             }
264             LOG.error("FlowNode not present for Datapath ID {}", dpnId);
265             return false;
266         }
267     }
268
269     @Override
270     public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
271         InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
272                 .augmentation(FlowCapableNode.class);
273         // Clearing the group registry cache for the connected node if exists
274         provider.getDevicesGroupRegistry().clearNodeGroups(node.getNodeId());
275         return futureMap.computeIfAbsent(node, future -> reconcileConfiguration(connectedNode));
276     }
277
278     @Override
279     public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
280         futureMap.computeIfPresent(node, (key, future) -> future).cancel(true);
281         futureMap.remove(node);
282         return Futures.immediateFuture(true);
283     }
284
285     @Override
286     public int getPriority() {
287         return priority;
288     }
289
290     @Override
291     public String getName() {
292         return serviceName;
293     }
294
295     @Override
296     public ResultState getResultState() {
297         return resultState;
298     }
299
300     private class ReconciliationTask implements Callable<Boolean> {
301
302         InstanceIdentifier<FlowCapableNode> nodeIdentity;
303
304         ReconciliationTask(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
305             nodeIdentity = nodeIdent;
306         }
307
308         @Override
309         public Boolean call() {
310             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
311             BigInteger dpnId = getDpnIdFromNodeName(node);
312
313             Optional<FlowCapableNode> flowNode;
314             // initialize the counter
315             int counter = 0;
316             try (ReadTransaction trans = provider.getReadTransaction()) {
317                 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
318             } catch (ExecutionException | InterruptedException e) {
319                 LOG.warn("Fail with read Config/DS for Node {} !", nodeIdentity, e);
320                 return false;
321             }
322
323             if (flowNode.isPresent()) {
324                 /* Tables - have to be pushed before groups */
325                 // CHECK if while pushing the update, updateTableInput can be null to emulate a
326                 // table add
327                 List<TableFeatures> tableList = flowNode.get().getTableFeatures() != null
328                         ? flowNode.get().getTableFeatures()
329                         : Collections.<TableFeatures>emptyList();
330                 for (TableFeatures tableFeaturesItem : tableList) {
331                     TableFeaturesKey tableKey = tableFeaturesItem.key();
332                     KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII = nodeIdentity
333                             .child(TableFeatures.class, new TableFeaturesKey(tableKey.getTableId()));
334                     provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdentity);
335                 }
336
337                 /* Groups - have to be first */
338                 List<Group> groups = flowNode.get().getGroup() != null ? flowNode.get().getGroup()
339                         : Collections.<Group>emptyList();
340                 List<Group> toBeInstalledGroups = new ArrayList<>();
341                 toBeInstalledGroups.addAll(groups);
342                 // new list for suspected groups pointing to ports .. when the ports come up
343                 // late
344                 List<Group> suspectedGroups = new ArrayList<>();
345                 Map<Long, ListenableFuture<?>> groupFutures = new HashMap<>();
346
347                 while ((!toBeInstalledGroups.isEmpty() || !suspectedGroups.isEmpty())
348                         && counter <= provider.getReconciliationRetryCount()) { // also check if the counter has not
349                                                                                 // crossed the threshold
350
351                     if (toBeInstalledGroups.isEmpty() && !suspectedGroups.isEmpty()) {
352                         LOG.debug("These Groups are pointing to node-connectors that are not up yet {}",
353                                 suspectedGroups.toString());
354                         toBeInstalledGroups.addAll(suspectedGroups);
355                         break;
356                     }
357
358                     ListIterator<Group> iterator = toBeInstalledGroups.listIterator();
359                     while (iterator.hasNext()) {
360                         Group group = iterator.next();
361                         boolean okToInstall = true;
362                         Buckets buckets = group.getBuckets();
363                         List<Bucket> bucketList = buckets == null ? null : buckets.getBucket();
364                         if (bucketList == null) {
365                             bucketList = Collections.<Bucket>emptyList();
366                         }
367                         for (Bucket bucket : bucketList) {
368                             List<Action> actions = bucket.getAction();
369                             if (actions == null) {
370                                 actions = Collections.<Action>emptyList();
371                             }
372                             for (Action action : actions) {
373                                 // chained-port
374                                 if (action.getAction().implementedInterface().getName()
375                                         .equals("org.opendaylight.yang.gen.v1.urn.opendaylight"
376                                                 + ".action.types.rev131112.action.action.OutputActionCase")) {
377                                     String nodeConnectorUri = ((OutputActionCase) action.getAction()).getOutputAction()
378                                             .getOutputNodeConnector().getValue();
379
380                                     LOG.debug("Installing the group for node connector {}", nodeConnectorUri);
381
382                                     // check if the nodeconnector is there in the multimap
383                                     boolean isPresent = provider.getFlowNodeConnectorInventoryTranslatorImpl()
384                                             .isNodeConnectorUpdated(dpnId, nodeConnectorUri);
385                                     // if yes set okToInstall = true
386
387                                     if (isPresent) {
388                                         break;
389                                     } else {
390                                         // else put it in a different list and still set okToInstall = true
391                                         suspectedGroups.add(group);
392                                         LOG.debug(
393                                                 "Not yet received the node-connector updated for {} "
394                                                         + "for the group with id {}",
395                                                 nodeConnectorUri, group.getGroupId().toString());
396                                         break;
397                                     }
398                                 } else if (action.getAction().implementedInterface().getName()
399                                         .equals("org.opendaylight.yang.gen.v1.urn.opendaylight"
400                                                 + ".action.types.rev131112.action.action.GroupActionCase")) {
401                                     // chained groups
402                                     Long groupId = ((GroupActionCase) action.getAction()).getGroupAction().getGroupId();
403                                     ListenableFuture<?> future = groupFutures.get(groupId);
404                                     if (future == null) {
405                                         okToInstall = false;
406                                         break;
407                                     }
408                                     // Need to ensure that the group specified
409                                     // by group-action is already installed.
410                                     awaitGroup(node, future);
411                                 }
412                             }
413                             if (!okToInstall) {
414                                 // increment retry counter value
415                                 counter++;
416                                 break;
417                             }
418                         }
419                         if (okToInstall) {
420                             addGroup(groupFutures, group);
421                             iterator.remove();
422                             // resetting the counter to zero
423                             counter = 0;
424                         }
425                     }
426                 }
427
428                 /* installation of suspected groups */
429                 if (!toBeInstalledGroups.isEmpty()) {
430                     for (Group group : toBeInstalledGroups) {
431                         LOG.debug(
432                                 "Installing the group {} finally although "
433                                         + "the port is not up after checking for {} times ",
434                                 group.getGroupId().toString(), provider.getReconciliationRetryCount());
435                         addGroup(groupFutures, group);
436                     }
437                 }
438                 /* Meters */
439                 List<Meter> meters = flowNode.get().getMeter() != null ? flowNode.get().getMeter()
440                         : Collections.<Meter>emptyList();
441                 for (Meter meter : meters) {
442                     final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdentity.child(Meter.class,
443                             meter.key());
444                     provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
445                 }
446
447                 // Need to wait for all groups to be installed before adding
448                 // flows.
449                 awaitGroups(node, groupFutures.values());
450
451                 /* Flows */
452                 List<Table> tables = flowNode.get().getTable() != null ? flowNode.get().getTable()
453                         : Collections.<Table>emptyList();
454                 for (Table table : tables) {
455                     final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdentity.child(Table.class,
456                             table.key());
457                     List<Flow> flows = table.getFlow() != null ? table.getFlow() : Collections.<Flow>emptyList();
458                     for (Flow flow : flows) {
459                         final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
460                                 flow.key());
461                         provider.getFlowCommiter().add(flowIdent, flow, nodeIdentity);
462                     }
463                 }
464             }
465             return true;
466         }
467
468         /**
469          * Invoke add-group RPC, and put listenable future associated with the RPC into
470          * the given map.
471          *
472          * @param map
473          *            The map to store listenable futures associated with add-group RPC.
474          * @param group
475          *            The group to add.
476          */
477         private void addGroup(Map<Long, ListenableFuture<?>> map, Group group) {
478             KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.key());
479             final Long groupId = group.getGroupId().getValue();
480             ListenableFuture<?> future = JdkFutureAdapters
481                     .listenInPoolThread(provider.getGroupCommiter().add(groupIdent, group, nodeIdentity));
482
483             Futures.addCallback(future, new FutureCallback<Object>() {
484                 @Override
485                 public void onSuccess(Object result) {
486                     if (LOG.isTraceEnabled()) {
487                         LOG.trace("add-group RPC completed: node={}, id={}",
488                                 nodeIdentity.firstKeyOf(Node.class).getId().getValue(), groupId);
489                     }
490                 }
491
492                 @Override
493                 public void onFailure(Throwable cause) {
494                     LOG.debug("add-group RPC failed: node={}, id={}",
495                             nodeIdentity.firstKeyOf(Node.class).getId().getValue(), groupId, cause);
496                 }
497             }, MoreExecutors.directExecutor());
498
499             map.put(groupId, future);
500         }
501
502         /**
503          * Wait for completion of add-group RPC.
504          *
505          * @param nodeId
506          *            The identifier for the target node.
507          * @param future
508          *            Future associated with add-group RPC that installs the target
509          *            group.
510          */
511         private void awaitGroup(String nodeId, ListenableFuture<?> future) {
512             awaitGroups(nodeId, Collections.singleton(future));
513         }
514
515         /**
516          * Wait for completion of add-group RPCs.
517          *
518          * @param nodeId
519          *            The identifier for the target node.
520          * @param futures
521          *            A collection of futures associated with add-group RPCs.
522          */
523         private void awaitGroups(String nodeId, Collection<ListenableFuture<?>> futures) {
524             if (!futures.isEmpty()) {
525                 long timeout = Math.min(ADD_GROUP_TIMEOUT * futures.size(), MAX_ADD_GROUP_TIMEOUT);
526                 try {
527                     Futures.successfulAsList(futures).get(timeout, TimeUnit.NANOSECONDS);
528                     LOG.trace("awaitGroups() completed: node={}", nodeId);
529                 } catch (TimeoutException | InterruptedException | ExecutionException e) {
530                     LOG.debug("add-group RPCs did not complete: node={}", nodeId);
531                 }
532             }
533         }
534     }
535
536     private BigInteger getDpnIdFromNodeName(String nodeName) {
537
538         String dpId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
539         return new BigInteger(dpId);
540     }
541
542     private void reconciliationPreProcess(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
543         List<InstanceIdentifier<StaleFlow>> staleFlowsToBeBulkDeleted = Lists.newArrayList();
544         List<InstanceIdentifier<StaleGroup>> staleGroupsToBeBulkDeleted = Lists.newArrayList();
545         List<InstanceIdentifier<StaleMeter>> staleMetersToBeBulkDeleted = Lists.newArrayList();
546
547         Optional<FlowCapableNode> flowNode = Optional.empty();
548
549         try (ReadTransaction trans = provider.getReadTransaction()) {
550             flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get();
551         } catch (ExecutionException | InterruptedException e) {
552             LOG.warn("Reconciliation Pre-Processing Fail with read Config/DS for Node {} !", nodeIdent, e);
553         }
554
555         if (flowNode.isPresent()) {
556
557             LOG.debug("Proceeding with deletion of stale-marked Flows on switch {} using Openflow interface",
558                     nodeIdent.toString());
559             /* Stale-Flows - Stale-marked Flows have to be removed first for safety */
560             List<Table> tables = flowNode.get().getTable() != null ? flowNode.get().getTable()
561                     : Collections.<Table>emptyList();
562             for (Table table : tables) {
563                 final KeyedInstanceIdentifier<Table, TableKey> tableIdent = nodeIdent.child(Table.class,
564                         table.key());
565                 List<StaleFlow> staleFlows = table.getStaleFlow() != null ? table.getStaleFlow()
566                         : Collections.<StaleFlow>emptyList();
567                 for (StaleFlow staleFlow : staleFlows) {
568
569                     FlowBuilder flowBuilder = new FlowBuilder(staleFlow);
570                     Flow toBeDeletedFlow = flowBuilder.setId(staleFlow.getId()).build();
571
572                     final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent = tableIdent.child(Flow.class,
573                             toBeDeletedFlow.key());
574
575                     this.provider.getFlowCommiter().remove(flowIdent, toBeDeletedFlow, nodeIdent);
576
577                     staleFlowsToBeBulkDeleted.add(getStaleFlowInstanceIdentifier(staleFlow, nodeIdent));
578                 }
579             }
580
581             LOG.debug("Proceeding with deletion of stale-marked Groups for switch {} using Openflow interface",
582                     nodeIdent.toString());
583
584             // TODO: Should we collate the futures of RPC-calls to be sure that groups are
585             // Flows are fully deleted
586             // before attempting to delete groups - just in case there are references
587
588             /* Stale-marked Groups - Can be deleted after flows */
589             List<StaleGroup> staleGroups = flowNode.get().getStaleGroup() != null ? flowNode.get().getStaleGroup()
590                     : Collections.<StaleGroup>emptyList();
591             for (StaleGroup staleGroup : staleGroups) {
592
593                 GroupBuilder groupBuilder = new GroupBuilder(staleGroup);
594                 Group toBeDeletedGroup = groupBuilder.setGroupId(staleGroup.getGroupId()).build();
595
596                 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdent.child(Group.class,
597                         toBeDeletedGroup.key());
598
599                 this.provider.getGroupCommiter().remove(groupIdent, toBeDeletedGroup, nodeIdent);
600
601                 staleGroupsToBeBulkDeleted.add(getStaleGroupInstanceIdentifier(staleGroup, nodeIdent));
602             }
603
604             LOG.debug("Proceeding with deletion of stale-marked Meters for switch {} using Openflow interface",
605                     nodeIdent.toString());
606             /* Stale-marked Meters - can be deleted anytime - so least priority */
607             List<StaleMeter> staleMeters = flowNode.get().getStaleMeter() != null ? flowNode.get().getStaleMeter()
608                     : Collections.<StaleMeter>emptyList();
609
610             for (StaleMeter staleMeter : staleMeters) {
611
612                 MeterBuilder meterBuilder = new MeterBuilder(staleMeter);
613                 Meter toBeDeletedMeter = meterBuilder.setMeterId(staleMeter.getMeterId()).build();
614
615                 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent = nodeIdent.child(Meter.class,
616                         toBeDeletedMeter.key());
617
618                 this.provider.getMeterCommiter().remove(meterIdent, toBeDeletedMeter, nodeIdent);
619
620                 staleMetersToBeBulkDeleted.add(getStaleMeterInstanceIdentifier(staleMeter, nodeIdent));
621             }
622
623         }
624
625         LOG.debug("Deleting all stale-marked flows/groups/meters of for switch {} in Configuration DS",
626                 nodeIdent.toString());
627         // Now, do the bulk deletions
628         deleteDSStaleFlows(staleFlowsToBeBulkDeleted);
629         deleteDSStaleGroups(staleGroupsToBeBulkDeleted);
630         deleteDSStaleMeters(staleMetersToBeBulkDeleted);
631     }
632
633     private void deleteDSStaleFlows(List<InstanceIdentifier<StaleFlow>> flowsForBulkDelete) {
634         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
635
636         for (InstanceIdentifier<StaleFlow> staleFlowIId : flowsForBulkDelete) {
637             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleFlowIId);
638         }
639
640         FluentFuture<?> submitFuture = writeTransaction.commit();
641         handleStaleEntityDeletionResultFuture(submitFuture);
642     }
643
644     private void deleteDSStaleGroups(List<InstanceIdentifier<StaleGroup>> groupsForBulkDelete) {
645         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
646
647         for (InstanceIdentifier<StaleGroup> staleGroupIId : groupsForBulkDelete) {
648             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleGroupIId);
649         }
650
651         FluentFuture<?> submitFuture = writeTransaction.commit();
652         handleStaleEntityDeletionResultFuture(submitFuture);
653     }
654
655     private void deleteDSStaleMeters(List<InstanceIdentifier<StaleMeter>> metersForBulkDelete) {
656         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
657
658         for (InstanceIdentifier<StaleMeter> staleMeterIId : metersForBulkDelete) {
659             writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleMeterIId);
660         }
661
662         FluentFuture<?> submitFuture = writeTransaction.commit();
663         handleStaleEntityDeletionResultFuture(submitFuture);
664     }
665
666     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
667         .flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(
668             StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
669         return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child(
670                 org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
671                 new StaleFlowKey(new FlowId(staleFlow.getId())));
672     }
673
674     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
675         .group.types.rev131018.groups.StaleGroup> getStaleGroupInstanceIdentifier(
676             StaleGroup staleGroup, InstanceIdentifier<FlowCapableNode> nodeIdent) {
677         return nodeIdent.child(StaleGroup.class, new StaleGroupKey(new GroupId(staleGroup.getGroupId())));
678     }
679
680     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
681         .flow.inventory.rev130819.meters.StaleMeter> getStaleMeterInstanceIdentifier(
682             StaleMeter staleMeter, InstanceIdentifier<FlowCapableNode> nodeIdent) {
683         return nodeIdent.child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId())));
684     }
685
686     private void handleStaleEntityDeletionResultFuture(FluentFuture<?> submitFuture) {
687         submitFuture.addCallback(new FutureCallback<Object>() {
688             @Override
689             public void onSuccess(Object result) {
690                 LOG.debug("Stale entity removal success");
691             }
692
693             @Override
694             public void onFailure(Throwable throwable) {
695                 LOG.debug("Stale entity removal failed", throwable);
696             }
697         }, MoreExecutors.directExecutor());
698     }
699
700     private Flow getDeleteAllFlow() {
701         final FlowBuilder flowBuilder = new FlowBuilder();
702         flowBuilder.setTableId(OFConstants.OFPTT_ALL);
703         return flowBuilder.build();
704     }
705
706     private Group getDeleteAllGroup() {
707         final GroupBuilder groupBuilder = new GroupBuilder();
708         groupBuilder.setGroupType(GroupTypes.GroupAll);
709         groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
710         return groupBuilder.build();
711     }
712
713     private Messages createMessages(final NodeRef nodeRef, final Optional<FlowCapableNode> flowNode) {
714         final List<Message> messages = new ArrayList<>();
715         messages.add(new MessageBuilder().setNode(nodeRef)
716                 .setBundleInnerMessage(new BundleRemoveFlowCaseBuilder()
717                         .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build()).build())
718                 .build());
719
720         messages.add(new MessageBuilder().setNode(nodeRef)
721                 .setBundleInnerMessage(new BundleRemoveGroupCaseBuilder()
722                         .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()).build())
723                 .build());
724
725         if (flowNode.get().getGroup() != null) {
726             for (Group gr : flowNode.get().getGroup()) {
727                 NodeId nodeId = nodeRef.getValue().firstKeyOf(Node.class).getId();
728                 provider.getDevicesGroupRegistry().storeGroup(nodeId,gr.getGroupId().getValue());
729                 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(new BundleAddGroupCaseBuilder()
730                         .setAddGroupCaseData(new AddGroupCaseDataBuilder(gr).build()).build()).build());
731             }
732         }
733
734         if (flowNode.get().getTable() != null) {
735             for (Table table : flowNode.get().getTable()) {
736                 for (Flow flow : table.getFlow()) {
737                     messages.add(
738                             new MessageBuilder().setNode(nodeRef)
739                                     .setBundleInnerMessage(new BundleAddFlowCaseBuilder()
740                                             .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build())
741                                     .build());
742                 }
743             }
744         }
745
746         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
747         return new MessagesBuilder().setMessage(messages).build();
748     }
749 }