1f93bb0692fe5f40d2f32c4c9668e93845fd14bc
[genius.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / genius / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.mdsalutil.internal;
10
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
29 import org.opendaylight.genius.infra.Datastore;
30 import org.opendaylight.genius.infra.Datastore.Configuration;
31 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.TypedReadTransaction;
33 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
34 import org.opendaylight.genius.infra.TypedWriteTransaction;
35 import org.opendaylight.genius.mdsalutil.FlowEntity;
36 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
37 import org.opendaylight.genius.mdsalutil.GroupEntity;
38 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
39 import org.opendaylight.genius.mdsalutil.MDSALUtil;
40 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
41 import org.opendaylight.infrautils.inject.AbstractLifecycle;
42 import org.opendaylight.infrautils.utils.concurrent.Executors;
43 import org.opendaylight.infrautils.utils.concurrent.NamedLocks;
44 import org.opendaylight.infrautils.utils.concurrent.NamedSimpleReentrantLock.Acquired;
45 import org.opendaylight.mdsal.binding.api.DataBroker;
46 import org.opendaylight.mdsal.binding.api.WriteTransaction;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
49 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
71 import org.opendaylight.yangtools.yang.common.Uint64;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75 @Singleton
76 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
77 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
78     private static final class FlowLockKey {
79         private final Uint64 dpId;
80         private final FlowKey flowKey;
81         private final short tableId;
82
83         FlowLockKey(Uint64 dpId, short tableId, FlowKey flowKey) {
84             this.dpId = dpId;
85             this.tableId = tableId;
86             this.flowKey = flowKey;
87         }
88
89         @Override
90         public int hashCode() {
91             return 31 * Short.hashCode(tableId) + Objects.hash(dpId, flowKey);
92         }
93
94         @Override
95         public boolean equals(Object obj) {
96             if (this == obj) {
97                 return true;
98             }
99             if (!(obj instanceof FlowLockKey)) {
100                 return false;
101             }
102             final FlowLockKey other = (FlowLockKey) obj;
103             return tableId == other.tableId && Objects.equals(dpId, other.dpId)
104                     && Objects.equals(flowKey, other.flowKey);
105         }
106     }
107
108     private static final class GroupLockKey {
109         private final Uint64 dpId;
110         private final long groupId;
111
112         GroupLockKey(long groupId, Uint64 dpId) {
113             this.groupId = groupId;
114             this.dpId = dpId;
115         }
116
117         @Override
118         public int hashCode() {
119             return 31 * Long.hashCode(groupId) + Objects.hashCode(dpId);
120         }
121
122         @Override
123         public boolean equals(Object obj) {
124             if (this == obj) {
125                 return true;
126             }
127             if (!(obj instanceof GroupLockKey)) {
128                 return false;
129             }
130             final GroupLockKey other = (GroupLockKey) obj;
131             return groupId == other.groupId && Objects.equals(dpId, other.dpId);
132         }
133     }
134
135     private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
136     private static final NamedLocks<FlowLockKey> FLOW_LOCKS = new NamedLocks<>();
137     private static final NamedLocks<GroupLockKey> GROUP_LOCKS = new NamedLocks<>();
138
139     private final DataBroker dataBroker;
140     private final RetryingManagedNewTransactionRunner txRunner;
141     private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
142
143     private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
144     private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
145     private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
146     private final SingleTransactionDataBroker singleTxDb;
147     private final FlowListener flowListener = new FlowListener();
148     private final FlowConfigListener flowConfigListener = new FlowConfigListener();
149     private final GroupListener groupListener = new GroupListener();
150
151     /**
152      * Writes the flows and Groups to the MD SAL DataStore which will be sent to
153      * the openflowplugin for installing flows/groups on the switch. Other
154      * modules of VPN service that wants to install flows / groups on the switch
155      * uses this utility
156      *
157      * @param db
158      *            dataBroker reference
159      * @param pktProcService
160      *            PacketProcessingService for sending the packet outs
161      */
162     @Deprecated
163     public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
164         this(db);
165     }
166
167     @Inject
168     public MDSALManager(DataBroker db) {
169         this.dataBroker = db;
170         this.txRunner = new RetryingManagedNewTransactionRunner(db);
171         singleTxDb = new SingleTransactionDataBroker(dataBroker);
172         LOG.info("MDSAL Manager Initialized ");
173     }
174
175     @Override
176     protected void start() {
177         LOG.info("{} start", getClass().getSimpleName());
178
179         int batchSize = Integer.getInteger("batch.size", 1000);
180         int batchInterval = Integer.getInteger("batch.wait.time", 500);
181
182         flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
183     }
184
185     @Override
186     protected void stop() {
187         LOG.info("{} stop", getClass().getSimpleName());
188
189         flowListener.close();
190         flowConfigListener.close();
191         groupListener.close();
192     }
193
194     @VisibleForTesting
195     FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
196         return addCallBackForInstallFlowAndReturn(txRunner
197             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
198                 tx -> writeFlowEntityInternal(flowEntity, tx)));
199     }
200
201     private FluentFuture<Void> installFlowInternal(Uint64 dpId, Flow flow) {
202         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
203             tx -> writeFlowInternal(dpId, flow, tx));
204     }
205
206     private static void writeFlowEntityInternal(FlowEntity flowEntity,
207             TypedWriteTransaction<Datastore.Configuration> tx) {
208         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
209         FlowBuilder flowbld = flowEntity.getFlowBuilder();
210         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
211                 flowEntity.getTableId(), flowKey);
212         tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
213     }
214
215     private static void writeFlowInternal(Uint64 dpId, Flow flow,
216             TypedWriteTransaction<Datastore.Configuration> tx) {
217         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
218         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
219                                                             flow.getTableId().toJava(), flowKey);
220         tx.mergeParentStructurePut(flowInstanceId, flow);
221     }
222
223     @VisibleForTesting
224     FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
225         return addCallBackForInstallGroupAndReturn(txRunner
226             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
227                 tx -> writeGroupEntityInternal(groupEntity, tx)));
228     }
229
230     private static void writeGroupEntityInternal(GroupEntity groupEntity,
231             TypedWriteTransaction<Datastore.Configuration> tx) {
232         Group group = groupEntity.getGroupBuilder().build();
233         Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
234         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
235         tx.mergeParentStructurePut(groupInstanceId, group);
236     }
237
238     @VisibleForTesting
239     FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
240         return addCallBackForDeleteFlowAndReturn(txRunner
241                 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
242                     tx -> deleteFlowEntityInternal(flowEntity, tx)));
243     }
244
245     private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
246         Uint64 dpId = flowEntity.getDpnId();
247         short tableId = flowEntity.getTableId();
248         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
249         deleteFlow(dpId, tableId, flowKey, tx);
250     }
251
252     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
253         if (flowExists(dpId, tableId, flowKey)) {
254             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
255             tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
256         } else {
257             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
258         }
259     }
260
261     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
262                             TypedWriteTransaction<Datastore.Configuration> tx) {
263         if (flowExists(dpId, tableId, flowKey)) {
264             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
265             tx.delete(flowInstanceId);
266         } else {
267             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
268         }
269     }
270
271     private FluentFuture<Void> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
272         LOG.debug("Remove flow {}", flowEntity);
273         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
274             tx -> {
275                 FlowKey flowKey = new FlowKey(flowEntity.getId());
276                 short tableId = flowEntity.getTableId().toJava();
277                 deleteFlow(dpnId, tableId, flowKey, tx);
278             });
279     }
280
281     @VisibleForTesting
282     FluentFuture<Void> removeGroupInternal(Uint64 dpnId, long groupId) {
283         return addCallBackForInstallGroupAndReturn(txRunner
284             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
285                 tx -> removeGroupInternal(dpnId, groupId, tx)));
286     }
287
288     private void removeGroupInternal(Uint64 dpnId, long groupId,
289                                      TypedWriteTransaction<Datastore.Configuration> tx) {
290         Node nodeDpn = buildDpnNode(dpnId);
291         if (groupExists(nodeDpn, groupId)) {
292             InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
293             tx.delete(groupInstanceId);
294         } else {
295             LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
296         }
297     }
298
299     private static Node buildDpnNode(Uint64 dpnId) {
300         NodeId nodeId = new NodeId("openflow:" + dpnId);
301         Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
302
303         return nodeDpn;
304     }
305
306     private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
307         if (LOG.isTraceEnabled()) {
308             LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
309         }
310         Flow flow = flowEntity.getFlowBuilder().build();
311         String flowId = flowEntity.getFlowId();
312         short tableId = flowEntity.getTableId();
313         Uint64 dpId = flowEntity.getDpnId();
314         FlowKey flowKey = new FlowKey(new FlowId(flowId));
315         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
316
317         if (isRemove) {
318             try (Acquired lock = FLOW_LOCKS.acquire(new FlowLockKey(dpId, tableId, flowKey))) {
319                 if (flowExists(dpId, tableId, flowKey)) {
320                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
321                 } else {
322                     LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
323                 }
324             }
325         } else {
326             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
327         }
328     }
329
330     private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
331         if (LOG.isTraceEnabled()) {
332             LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
333         }
334         Group group = groupEntity.getGroupBuilder().build();
335         Uint64 dpId = groupEntity.getDpnId();
336         long groupId = groupEntity.getGroupId();
337         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
338
339         if (isRemove) {
340             try (Acquired lock = GROUP_LOCKS.acquire(new GroupLockKey(groupId, dpId))) {
341                 if (groupExists(dpId, groupId)) {
342                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
343                 } else {
344                     LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
345                 }
346             }
347         } else {
348             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
349         }
350     }
351
352     private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
353
354         GroupListener() {
355             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
356                     .augmentation(FlowCapableNode.class).child(Group.class),
357                     Executors.newSingleThreadExecutor("GroupListener", LOG));
358         }
359
360         @Override
361         public void remove(InstanceIdentifier<Group> identifier, Group del) {
362             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
363             executeNotifyTaskIfRequired(dpId, del);
364         }
365
366         private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
367             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
368             Runnable notifyTask = groupMap.remove(groupKey);
369             if (notifyTask == null) {
370                 return;
371             }
372             executorService.execute(notifyTask);
373         }
374
375         @Override
376         public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
377             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
378             executeNotifyTaskIfRequired(dpId, update);
379         }
380
381         @Override
382         public void add(InstanceIdentifier<Group> identifier, Group add) {
383             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
384             executeNotifyTaskIfRequired(dpId, add);
385         }
386     }
387
388     private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
389
390         FlowListener() {
391             super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
392                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
393                     Executors.newSingleThreadExecutor("FlowListener", LOG));
394         }
395
396         @Override
397         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
398             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
399             notifyTaskIfRequired(dpId, del);
400         }
401
402         private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
403             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
404                                                   flow.getMatch(), flow.getId().getValue());
405             Runnable notifyTask = flowMap.remove(flowKey);
406             if (notifyTask == null) {
407                 return;
408             }
409             executorService.execute(notifyTask);
410         }
411
412         @Override
413         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
414         }
415
416         @Override
417         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
418             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
419             notifyTaskIfRequired(dpId, add);
420         }
421
422     }
423
424     private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
425         private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
426
427         FlowConfigListener() {
428             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
429                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
430                     Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
431         }
432
433         @Override
434         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
435             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
436             flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
437                 del.getId().getValue(), del.getTableId(), dpId);
438         }
439
440         @Override
441         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
442         }
443
444         @Override
445         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
446             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
447             flowLog.debug("FlowId {} added to Table {} on DPN {}",
448                 add.getId().getValue(), add.getTableId(), dpId);
449         }
450     }
451
452     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
453             justification = "https://github.com/spotbugs/spotbugs/issues/811")
454     private static Uint64 getDpnFromString(String dpnString) {
455         String[] split = dpnString.split(":");
456         return Uint64.valueOf(split[1]);
457     }
458
459     @Override
460     public FluentFuture<Void> installFlow(FlowEntity flowEntity) {
461         return installFlowInternal(flowEntity);
462     }
463
464     @Override
465     public FluentFuture<Void> installFlow(Uint64 dpId, Flow flowEntity) {
466         return installFlowInternal(dpId, flowEntity);
467     }
468
469     @Override
470     public FluentFuture<Void> installFlow(Uint64 dpId, FlowEntity flowEntity) {
471         return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
472     }
473
474     @Override
475     public ListenableFuture<Void> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
476         ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
477             tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
478
479         Futures.addCallback(future, new FutureCallback<Void>() {
480             @Override
481             public void onSuccess(final Void result) {
482                 // Committed successfully
483                 LOG.debug("Delete Flow -- Committed successfully");
484             }
485
486             @Override
487             public void onFailure(final Throwable throwable) {
488                 // Transaction failed
489                 if (throwable instanceof OptimisticLockFailedException) {
490                     // Failed because of concurrent transaction modifying same
491                     // data
492                     LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
493                 } else {
494                     // Some other type of TransactionCommitFailedException
495                     LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
496                 }
497             }
498
499         }, MoreExecutors.directExecutor());
500
501         return future;
502     }
503
504     @Override
505     public FluentFuture<Void> removeFlow(Uint64 dpId, Flow flowEntity) {
506         return removeFlowNewInternal(dpId, flowEntity);
507     }
508
509     @Override
510     public FluentFuture<Void> removeFlow(FlowEntity flowEntity) {
511         return removeFlowInternal(flowEntity);
512     }
513
514     @Override
515     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
516             throws ExecutionException, InterruptedException {
517         removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
518     }
519
520     @Override
521     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
522             throws ExecutionException, InterruptedException {
523         removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
524     }
525
526     @Override
527     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
528             throws ExecutionException, InterruptedException {
529         removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
530     }
531
532     @Override
533     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
534             short tableId) throws ExecutionException, InterruptedException {
535         InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
536         if (tx.read(flowInstanceIdentifier).get().isPresent()) {
537             tx.delete(flowInstanceIdentifier);
538         }
539     }
540
541     @Override
542     public void removeGroup(GroupEntity groupEntity) {
543         removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
544     }
545
546     @Override
547     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
548             throws ExecutionException, InterruptedException {
549         removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
550     }
551
552     @Override
553     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
554             throws ExecutionException, InterruptedException {
555         removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
556     }
557
558     @Override
559     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
560             throws ExecutionException, InterruptedException {
561         Node nodeDpn = buildDpnNode(dpId);
562         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
563         if (tx.read(groupInstanceId).get().isPresent()) {
564             tx.delete(groupInstanceId);
565         } else {
566             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
567         }
568     }
569
570     @Override
571     public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
572         syncSetUpFlowInternal(flowEntity, true);
573     }
574
575     @Override
576     public void syncRemoveFlow(FlowEntity flowEntity) {
577         syncSetUpFlowInternal(flowEntity, true);
578     }
579
580     @Override
581     public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
582         syncSetUpFlowInternal(flowEntity, false);
583     }
584
585     @Override
586     public void syncInstallFlow(FlowEntity flowEntity) {
587         syncSetUpFlowInternal(flowEntity, false);
588     }
589
590     @Override
591     public void syncInstallGroup(GroupEntity groupEntity) {
592         syncSetUpGroupInternal(groupEntity, false);
593     }
594
595     @Override
596     public void syncRemoveGroup(GroupEntity groupEntity) {
597         syncSetUpGroupInternal(groupEntity, true);
598     }
599
600     @Override
601     public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
602         addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
603     }
604
605     @Override
606     public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
607         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
608                                                             flow.getTableId().toJava(), flow.key());
609         tx.mergeParentStructurePut(flowInstanceId, flow);
610     }
611
612     @Override
613     public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
614         addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
615     }
616
617     @Override
618     public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
619         Node nodeDpn = buildDpnNode(dpId);
620         long groupId = group.getGroupId().getValue().toJava();
621         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
622         tx.mergeParentStructurePut(groupInstanceId, group);
623     }
624
625     @Override
626     public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
627             throws ExecutionException, InterruptedException {
628         Node nodeDpn = buildDpnNode(dpId);
629         if (groupExists(tx, nodeDpn, groupId)) {
630             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
631                 bucket.getBucketId().getValue().toJava(), nodeDpn);
632             tx.put(bucketInstanceId, bucket);
633         }
634     }
635
636     @Override
637     public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
638             throws ExecutionException, InterruptedException {
639         Node nodeDpn = buildDpnNode(dpId);
640         if (groupExists(tx, nodeDpn, groupId)) {
641             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
642             tx.delete(bucketInstanceId);
643         } else {
644             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
645         }
646     }
647
648     @Override
649     public boolean groupExists(Uint64 dpId, long groupId) {
650         return groupExists(buildDpnNode(dpId), groupId);
651     }
652
653     private boolean groupExists(Node nodeDpn, long groupId) {
654         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
655         try {
656             return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
657         } catch (ExecutionException | InterruptedException e) {
658             LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
659         }
660         return false;
661     }
662
663     private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
664            throws ExecutionException, InterruptedException {
665         return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
666     }
667
668     private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
669         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
670                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
671                 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
672         return groupInstanceId;
673     }
674
675     private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
676         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
677         try {
678             Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
679                     flowInstanceId);
680             return flowOptional.isPresent();
681         } catch (ExecutionException | InterruptedException e) {
682             LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
683         }
684         return false;
685     }
686
687     private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
688             FlowKey flowKey) {
689         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
690                 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
691                 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
692         return flowInstanceId;
693     }
694
695     private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
696             Node nodeDpn) {
697         InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
698                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
699                 .child(Group.class, new GroupKey(new GroupId(groupId)))
700                 .child(Buckets.class)
701                 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
702         return bucketInstanceId;
703     }
704
705     private static FluentFuture<Void> addCallBackForDeleteFlowAndReturn(FluentFuture<Void> fluentFuture) {
706         return callBack(fluentFuture, "Delete Flow");
707     }
708
709     private static FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
710         return callBack(fluentFuture, "Install Flow");
711     }
712
713     private static FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
714         return callBack(fluentFuture, "Install Group");
715     }
716
717     // Generic for handling callbacks
718     private static FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
719         fluentFuture.addCallback(new FutureCallback<Void>() {
720             @Override
721             public void onSuccess(final Void result) {
722                 // Committed successfully
723                 LOG.debug("{} -- Committedsuccessfully ", log);
724             }
725
726             @Override
727             public void onFailure(final Throwable throwable) {
728                 // Transaction failed
729
730                 if (throwable instanceof OptimisticLockFailedException) {
731                     // Failed because of concurrent transaction modifying same
732                     // data
733                     LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
734                 } else {
735                     // Some other type of TransactionCommitFailedException
736                     LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
737                 }
738             }
739         }, MoreExecutors.directExecutor());
740         return fluentFuture;
741     }
742 }