MRI version bump for Aluminium
[genius.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / genius / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.mdsalutil.internal;
10
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ExecutorService;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
28 import org.opendaylight.genius.infra.Datastore;
29 import org.opendaylight.genius.infra.Datastore.Configuration;
30 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
31 import org.opendaylight.genius.infra.TypedReadTransaction;
32 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
33 import org.opendaylight.genius.infra.TypedWriteTransaction;
34 import org.opendaylight.genius.mdsalutil.FlowEntity;
35 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
36 import org.opendaylight.genius.mdsalutil.GroupEntity;
37 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
38 import org.opendaylight.genius.mdsalutil.MDSALUtil;
39 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
40 import org.opendaylight.infrautils.inject.AbstractLifecycle;
41 import org.opendaylight.infrautils.utils.concurrent.Executors;
42 import org.opendaylight.mdsal.binding.api.DataBroker;
43 import org.opendaylight.mdsal.binding.api.WriteTransaction;
44 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
45 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
46 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.Uint64;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 @Singleton
73 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
74 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
75
76     private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
77
78     private final DataBroker dataBroker;
79     private final RetryingManagedNewTransactionRunner txRunner;
80     private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
81
82     private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
83     private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
84     private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
85     private final SingleTransactionDataBroker singleTxDb;
86     private final FlowListener flowListener = new FlowListener();
87     private final FlowConfigListener flowConfigListener = new FlowConfigListener();
88     private final GroupListener groupListener = new GroupListener();
89
90     /**
91      * Writes the flows and Groups to the MD SAL DataStore which will be sent to
92      * the openflowplugin for installing flows/groups on the switch. Other
93      * modules of VPN service that wants to install flows / groups on the switch
94      * uses this utility
95      *
96      * @param db
97      *            dataBroker reference
98      * @param pktProcService
99      *            PacketProcessingService for sending the packet outs
100      */
101     @Deprecated
102     public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
103         this(db);
104     }
105
106     @Inject
107     public MDSALManager(DataBroker db) {
108         this.dataBroker = db;
109         this.txRunner = new RetryingManagedNewTransactionRunner(db);
110         singleTxDb = new SingleTransactionDataBroker(dataBroker);
111         LOG.info("MDSAL Manager Initialized ");
112     }
113
114     @Override
115     protected void start() {
116         LOG.info("{} start", getClass().getSimpleName());
117
118         int batchSize = Integer.getInteger("batch.size", 1000);
119         int batchInterval = Integer.getInteger("batch.wait.time", 500);
120
121         flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
122     }
123
124     @Override
125     protected void stop() {
126         LOG.info("{} stop", getClass().getSimpleName());
127
128         flowListener.close();
129         flowConfigListener.close();
130         groupListener.close();
131     }
132
133     @VisibleForTesting
134     FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
135         return addCallBackForInstallFlowAndReturn(txRunner
136             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
137                 tx -> writeFlowEntityInternal(flowEntity, tx)));
138     }
139
140     private FluentFuture<Void> installFlowInternal(Uint64 dpId, Flow flow) {
141         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
142             tx -> writeFlowInternal(dpId, flow, tx));
143     }
144
145     private static void writeFlowEntityInternal(FlowEntity flowEntity,
146             TypedWriteTransaction<Datastore.Configuration> tx) {
147         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
148         FlowBuilder flowbld = flowEntity.getFlowBuilder();
149         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
150                 flowEntity.getTableId(), flowKey);
151         tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
152     }
153
154     private static void writeFlowInternal(Uint64 dpId, Flow flow,
155             TypedWriteTransaction<Datastore.Configuration> tx) {
156         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
157         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
158                                                             flow.getTableId().toJava(), flowKey);
159         tx.mergeParentStructurePut(flowInstanceId, flow);
160     }
161
162     @VisibleForTesting
163     FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
164         return addCallBackForInstallGroupAndReturn(txRunner
165             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
166                 tx -> writeGroupEntityInternal(groupEntity, tx)));
167     }
168
169     private static void writeGroupEntityInternal(GroupEntity groupEntity,
170             TypedWriteTransaction<Datastore.Configuration> tx) {
171         Group group = groupEntity.getGroupBuilder().build();
172         Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
173         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
174         tx.mergeParentStructurePut(groupInstanceId, group);
175     }
176
177     @VisibleForTesting
178     FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
179         return addCallBackForDeleteFlowAndReturn(txRunner
180                 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
181                     tx -> deleteFlowEntityInternal(flowEntity, tx)));
182     }
183
184     private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
185         Uint64 dpId = flowEntity.getDpnId();
186         short tableId = flowEntity.getTableId();
187         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
188         deleteFlow(dpId, tableId, flowKey, tx);
189     }
190
191     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
192         if (flowExists(dpId, tableId, flowKey)) {
193             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
194             tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
195         } else {
196             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
197         }
198     }
199
200     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
201                             TypedWriteTransaction<Datastore.Configuration> tx) {
202         if (flowExists(dpId, tableId, flowKey)) {
203             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
204             tx.delete(flowInstanceId);
205         } else {
206             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
207         }
208     }
209
210     private FluentFuture<Void> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
211         LOG.debug("Remove flow {}", flowEntity);
212         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
213             tx -> {
214                 FlowKey flowKey = new FlowKey(flowEntity.getId());
215                 short tableId = flowEntity.getTableId().toJava();
216                 deleteFlow(dpnId, tableId, flowKey, tx);
217             });
218     }
219
220     @VisibleForTesting
221     FluentFuture<Void> removeGroupInternal(Uint64 dpnId, long groupId) {
222         return addCallBackForInstallGroupAndReturn(txRunner
223             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
224                 tx -> removeGroupInternal(dpnId, groupId, tx)));
225     }
226
227     private void removeGroupInternal(Uint64 dpnId, long groupId,
228                                      TypedWriteTransaction<Datastore.Configuration> tx) {
229         Node nodeDpn = buildDpnNode(dpnId);
230         if (groupExists(nodeDpn, groupId)) {
231             InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
232             tx.delete(groupInstanceId);
233         } else {
234             LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
235         }
236     }
237
238     private static Node buildDpnNode(Uint64 dpnId) {
239         NodeId nodeId = new NodeId("openflow:" + dpnId);
240         Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
241
242         return nodeDpn;
243     }
244
245     private static String getGroupKey(long groupId, Uint64 dpId) {
246         String synchronizingKey = "group-key-" + groupId + dpId;
247         return synchronizingKey.intern();
248     }
249
250     private static String getFlowKey(Uint64 dpId, short tableId, FlowKey flowKey) {
251         String synchronizingKey = "flow-key-" + dpId + tableId + flowKey;
252         return synchronizingKey.intern();
253     }
254
255     private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
256         if (LOG.isTraceEnabled()) {
257             LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
258         }
259         Flow flow = flowEntity.getFlowBuilder().build();
260         String flowId = flowEntity.getFlowId();
261         short tableId = flowEntity.getTableId();
262         Uint64 dpId = flowEntity.getDpnId();
263         FlowKey flowKey = new FlowKey(new FlowId(flowId));
264         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
265
266         if (isRemove) {
267             synchronized (getFlowKey(dpId, tableId, flowKey)) {
268                 if (flowExists(dpId, tableId, flowKey)) {
269                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
270                 } else {
271                     LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
272                 }
273             }
274         } else {
275             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
276         }
277     }
278
279     private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
280         if (LOG.isTraceEnabled()) {
281             LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
282         }
283         Group group = groupEntity.getGroupBuilder().build();
284         Uint64 dpId = groupEntity.getDpnId();
285         long groupId = groupEntity.getGroupId();
286         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
287
288         if (isRemove) {
289             synchronized (getGroupKey(groupId, dpId)) {
290                 if (groupExists(dpId, groupId)) {
291                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
292                 } else {
293                     LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
294                 }
295             }
296         } else {
297             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
298         }
299     }
300
301     private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
302
303         GroupListener() {
304             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
305                     .augmentation(FlowCapableNode.class).child(Group.class),
306                     Executors.newSingleThreadExecutor("GroupListener", LOG));
307         }
308
309         @Override
310         public void remove(InstanceIdentifier<Group> identifier, Group del) {
311             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
312             executeNotifyTaskIfRequired(dpId, del);
313         }
314
315         private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
316             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
317             Runnable notifyTask = groupMap.remove(groupKey);
318             if (notifyTask == null) {
319                 return;
320             }
321             executorService.execute(notifyTask);
322         }
323
324         @Override
325         public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
326             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
327             executeNotifyTaskIfRequired(dpId, update);
328         }
329
330         @Override
331         public void add(InstanceIdentifier<Group> identifier, Group add) {
332             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
333             executeNotifyTaskIfRequired(dpId, add);
334         }
335     }
336
337     private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
338
339         FlowListener() {
340             super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
341                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
342                     Executors.newSingleThreadExecutor("FlowListener", LOG));
343         }
344
345         @Override
346         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
347             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
348             notifyTaskIfRequired(dpId, del);
349         }
350
351         private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
352             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
353                                                   flow.getMatch(), flow.getId().getValue());
354             Runnable notifyTask = flowMap.remove(flowKey);
355             if (notifyTask == null) {
356                 return;
357             }
358             executorService.execute(notifyTask);
359         }
360
361         @Override
362         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
363         }
364
365         @Override
366         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
367             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
368             notifyTaskIfRequired(dpId, add);
369         }
370
371     }
372
373     private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
374         private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
375
376         FlowConfigListener() {
377             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
378                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
379                     Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
380         }
381
382         @Override
383         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
384             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
385             flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
386                 del.getId().getValue(), del.getTableId(), dpId);
387         }
388
389         @Override
390         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
391         }
392
393         @Override
394         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
395             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
396             flowLog.debug("FlowId {} added to Table {} on DPN {}",
397                 add.getId().getValue(), add.getTableId(), dpId);
398         }
399     }
400
401     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
402             justification = "https://github.com/spotbugs/spotbugs/issues/811")
403     private static Uint64 getDpnFromString(String dpnString) {
404         String[] split = dpnString.split(":");
405         return Uint64.valueOf(split[1]);
406     }
407
408     @Override
409     public FluentFuture<Void> installFlow(FlowEntity flowEntity) {
410         return installFlowInternal(flowEntity);
411     }
412
413     @Override
414     public FluentFuture<Void> installFlow(Uint64 dpId, Flow flowEntity) {
415         return installFlowInternal(dpId, flowEntity);
416     }
417
418     @Override
419     public FluentFuture<Void> installFlow(Uint64 dpId, FlowEntity flowEntity) {
420         return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
421     }
422
423     @Override
424     public ListenableFuture<Void> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
425         ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
426             tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
427
428         Futures.addCallback(future, new FutureCallback<Void>() {
429             @Override
430             public void onSuccess(final Void result) {
431                 // Committed successfully
432                 LOG.debug("Delete Flow -- Committed successfully");
433             }
434
435             @Override
436             public void onFailure(final Throwable throwable) {
437                 // Transaction failed
438                 if (throwable instanceof OptimisticLockFailedException) {
439                     // Failed because of concurrent transaction modifying same
440                     // data
441                     LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
442                 } else {
443                     // Some other type of TransactionCommitFailedException
444                     LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
445                 }
446             }
447
448         }, MoreExecutors.directExecutor());
449
450         return future;
451     }
452
453     @Override
454     public FluentFuture<Void> removeFlow(Uint64 dpId, Flow flowEntity) {
455         return removeFlowNewInternal(dpId, flowEntity);
456     }
457
458     @Override
459     public FluentFuture<Void> removeFlow(FlowEntity flowEntity) {
460         return removeFlowInternal(flowEntity);
461     }
462
463     @Override
464     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
465             throws ExecutionException, InterruptedException {
466         removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
467     }
468
469     @Override
470     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
471             throws ExecutionException, InterruptedException {
472         removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
473     }
474
475     @Override
476     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
477             throws ExecutionException, InterruptedException {
478         removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
479     }
480
481     @Override
482     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
483             short tableId) throws ExecutionException, InterruptedException {
484         InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
485         if (tx.read(flowInstanceIdentifier).get().isPresent()) {
486             tx.delete(flowInstanceIdentifier);
487         }
488     }
489
490     @Override
491     public void removeGroup(GroupEntity groupEntity) {
492         removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
493     }
494
495     @Override
496     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
497             throws ExecutionException, InterruptedException {
498         removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
499     }
500
501     @Override
502     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
503             throws ExecutionException, InterruptedException {
504         removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
505     }
506
507     @Override
508     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
509             throws ExecutionException, InterruptedException {
510         Node nodeDpn = buildDpnNode(dpId);
511         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
512         if (tx.read(groupInstanceId).get().isPresent()) {
513             tx.delete(groupInstanceId);
514         } else {
515             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
516         }
517     }
518
519     @Override
520     public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
521         syncSetUpFlowInternal(flowEntity, true);
522     }
523
524     @Override
525     public void syncRemoveFlow(FlowEntity flowEntity) {
526         syncSetUpFlowInternal(flowEntity, true);
527     }
528
529     @Override
530     public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
531         syncSetUpFlowInternal(flowEntity, false);
532     }
533
534     @Override
535     public void syncInstallFlow(FlowEntity flowEntity) {
536         syncSetUpFlowInternal(flowEntity, false);
537     }
538
539     @Override
540     public void syncInstallGroup(GroupEntity groupEntity) {
541         syncSetUpGroupInternal(groupEntity, false);
542     }
543
544     @Override
545     public void syncRemoveGroup(GroupEntity groupEntity) {
546         syncSetUpGroupInternal(groupEntity, true);
547     }
548
549     @Override
550     public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
551         addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
552     }
553
554     @Override
555     public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
556         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
557                                                             flow.getTableId().toJava(), flow.key());
558         tx.mergeParentStructurePut(flowInstanceId, flow);
559     }
560
561     @Override
562     public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
563         addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
564     }
565
566     @Override
567     public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
568         Node nodeDpn = buildDpnNode(dpId);
569         long groupId = group.getGroupId().getValue().toJava();
570         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
571         tx.mergeParentStructurePut(groupInstanceId, group);
572     }
573
574     @Override
575     public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
576             throws ExecutionException, InterruptedException {
577         Node nodeDpn = buildDpnNode(dpId);
578         if (groupExists(tx, nodeDpn, groupId)) {
579             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
580                 bucket.getBucketId().getValue().toJava(), nodeDpn);
581             tx.put(bucketInstanceId, bucket);
582         }
583     }
584
585     @Override
586     public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
587             throws ExecutionException, InterruptedException {
588         Node nodeDpn = buildDpnNode(dpId);
589         if (groupExists(tx, nodeDpn, groupId)) {
590             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
591             tx.delete(bucketInstanceId);
592         } else {
593             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
594         }
595     }
596
597     @Override
598     public boolean groupExists(Uint64 dpId, long groupId) {
599         return groupExists(buildDpnNode(dpId), groupId);
600     }
601
602     private boolean groupExists(Node nodeDpn, long groupId) {
603         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
604         try {
605             return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
606         } catch (ExecutionException | InterruptedException e) {
607             LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
608         }
609         return false;
610     }
611
612     private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
613            throws ExecutionException, InterruptedException {
614         return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
615     }
616
617     private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
618         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
619                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
620                 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
621         return groupInstanceId;
622     }
623
624     private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
625         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
626         try {
627             Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
628                     flowInstanceId);
629             return flowOptional.isPresent();
630         } catch (ExecutionException | InterruptedException e) {
631             LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
632         }
633         return false;
634     }
635
636     private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
637             FlowKey flowKey) {
638         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
639                 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
640                 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
641         return flowInstanceId;
642     }
643
644     private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
645             Node nodeDpn) {
646         InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
647                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
648                 .child(Group.class, new GroupKey(new GroupId(groupId)))
649                 .child(Buckets.class)
650                 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
651         return bucketInstanceId;
652     }
653
654     private static FluentFuture<Void> addCallBackForDeleteFlowAndReturn(FluentFuture<Void> fluentFuture) {
655         return callBack(fluentFuture, "Delete Flow");
656     }
657
658     private static FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
659         return callBack(fluentFuture, "Install Flow");
660     }
661
662     private static FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
663         return callBack(fluentFuture, "Install Group");
664     }
665
666     // Generic for handling callbacks
667     private static FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
668         fluentFuture.addCallback(new FutureCallback<Void>() {
669             @Override
670             public void onSuccess(final Void result) {
671                 // Committed successfully
672                 LOG.debug("{} -- Committedsuccessfully ", log);
673             }
674
675             @Override
676             public void onFailure(final Throwable throwable) {
677                 // Transaction failed
678
679                 if (throwable instanceof OptimisticLockFailedException) {
680                     // Failed because of concurrent transaction modifying same
681                     // data
682                     LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
683                 } else {
684                     // Some other type of TransactionCommitFailedException
685                     LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
686                 }
687             }
688         }, MoreExecutors.directExecutor());
689         return fluentFuture;
690     }
691 }