Convert itm-impl to use mdsal-binding-util
[genius.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / genius / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.mdsalutil.internal;
9
10 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ExecutorService;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
28 import org.opendaylight.genius.mdsalutil.FlowEntity;
29 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
30 import org.opendaylight.genius.mdsalutil.GroupEntity;
31 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
32 import org.opendaylight.genius.mdsalutil.MDSALUtil;
33 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
34 import org.opendaylight.infrautils.inject.AbstractLifecycle;
35 import org.opendaylight.infrautils.utils.concurrent.Executors;
36 import org.opendaylight.infrautils.utils.concurrent.NamedLocks;
37 import org.opendaylight.infrautils.utils.concurrent.NamedSimpleReentrantLock.Acquired;
38 import org.opendaylight.mdsal.binding.api.DataBroker;
39 import org.opendaylight.mdsal.binding.util.Datastore;
40 import org.opendaylight.mdsal.binding.util.Datastore.Configuration;
41 import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
42 import org.opendaylight.mdsal.binding.util.TypedReadTransaction;
43 import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction;
44 import org.opendaylight.mdsal.binding.util.TypedWriteTransaction;
45 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
46 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
47 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.Uint64;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 @Singleton
74 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
75 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
76     private static final class FlowLockKey {
77         private final Uint64 dpId;
78         private final FlowKey flowKey;
79         private final short tableId;
80
81         FlowLockKey(Uint64 dpId, short tableId, FlowKey flowKey) {
82             this.dpId = dpId;
83             this.tableId = tableId;
84             this.flowKey = flowKey;
85         }
86
87         @Override
88         public int hashCode() {
89             return 31 * Short.hashCode(tableId) + Objects.hash(dpId, flowKey);
90         }
91
92         @Override
93         public boolean equals(Object obj) {
94             if (this == obj) {
95                 return true;
96             }
97             if (!(obj instanceof FlowLockKey)) {
98                 return false;
99             }
100             final FlowLockKey other = (FlowLockKey) obj;
101             return tableId == other.tableId && Objects.equals(dpId, other.dpId)
102                     && Objects.equals(flowKey, other.flowKey);
103         }
104     }
105
106     private static final class GroupLockKey {
107         private final Uint64 dpId;
108         private final long groupId;
109
110         GroupLockKey(long groupId, Uint64 dpId) {
111             this.groupId = groupId;
112             this.dpId = dpId;
113         }
114
115         @Override
116         public int hashCode() {
117             return 31 * Long.hashCode(groupId) + Objects.hashCode(dpId);
118         }
119
120         @Override
121         public boolean equals(Object obj) {
122             if (this == obj) {
123                 return true;
124             }
125             if (!(obj instanceof GroupLockKey)) {
126                 return false;
127             }
128             final GroupLockKey other = (GroupLockKey) obj;
129             return groupId == other.groupId && Objects.equals(dpId, other.dpId);
130         }
131     }
132
133     private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
134     private static final NamedLocks<FlowLockKey> FLOW_LOCKS = new NamedLocks<>();
135     private static final NamedLocks<GroupLockKey> GROUP_LOCKS = new NamedLocks<>();
136
137     private final DataBroker dataBroker;
138     private final RetryingManagedNewTransactionRunner txRunner;
139     private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
140
141     private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
142     private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
143     private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
144     private final SingleTransactionDataBroker singleTxDb;
145     private final FlowListener flowListener = new FlowListener();
146     private final FlowConfigListener flowConfigListener = new FlowConfigListener();
147     private final GroupListener groupListener = new GroupListener();
148
149     /**
150      * Writes the flows and Groups to the MD SAL DataStore which will be sent to
151      * the openflowplugin for installing flows/groups on the switch. Other
152      * modules of VPN service that wants to install flows / groups on the switch
153      * uses this utility
154      *
155      * @param db
156      *            dataBroker reference
157      * @param pktProcService
158      *            PacketProcessingService for sending the packet outs
159      */
160     @Deprecated
161     public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
162         this(db);
163     }
164
165     @Inject
166     public MDSALManager(DataBroker db) {
167         this.dataBroker = db;
168         this.txRunner = new RetryingManagedNewTransactionRunner(db);
169         singleTxDb = new SingleTransactionDataBroker(dataBroker);
170         LOG.info("MDSAL Manager Initialized ");
171     }
172
173     @Override
174     protected void start() {
175         LOG.info("{} start", getClass().getSimpleName());
176
177         int batchSize = Integer.getInteger("batch.size", 1000);
178         int batchInterval = Integer.getInteger("batch.wait.time", 500);
179
180         flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
181     }
182
183     @Override
184     protected void stop() {
185         LOG.info("{} stop", getClass().getSimpleName());
186
187         flowListener.close();
188         flowConfigListener.close();
189         groupListener.close();
190     }
191
192     @VisibleForTesting
193     FluentFuture<?> installFlowInternal(FlowEntity flowEntity) {
194         return addCallBackForInstallFlowAndReturn(txRunner
195             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
196                 tx -> writeFlowEntityInternal(flowEntity, tx)));
197     }
198
199     private FluentFuture<?> installFlowInternal(Uint64 dpId, Flow flow) {
200         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
201             tx -> writeFlowInternal(dpId, flow, tx));
202     }
203
204     private static void writeFlowEntityInternal(FlowEntity flowEntity,
205             TypedWriteTransaction<Datastore.Configuration> tx) {
206         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
207         FlowBuilder flowbld = flowEntity.getFlowBuilder();
208         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
209                 flowEntity.getTableId(), flowKey);
210         tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
211     }
212
213     private static void writeFlowInternal(Uint64 dpId, Flow flow,
214             TypedWriteTransaction<Datastore.Configuration> tx) {
215         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
216         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
217                                                             flow.getTableId().toJava(), flowKey);
218         tx.mergeParentStructurePut(flowInstanceId, flow);
219     }
220
221     @VisibleForTesting
222     FluentFuture<?> installGroupInternal(GroupEntity groupEntity) {
223         return addCallBackForInstallGroupAndReturn(txRunner
224             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
225                 tx -> writeGroupEntityInternal(groupEntity, tx)));
226     }
227
228     private static void writeGroupEntityInternal(GroupEntity groupEntity,
229             TypedWriteTransaction<Datastore.Configuration> tx) {
230         Group group = groupEntity.getGroupBuilder().build();
231         Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
232         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
233         tx.mergeParentStructurePut(groupInstanceId, group);
234     }
235
236     @VisibleForTesting
237     FluentFuture<?> removeFlowInternal(FlowEntity flowEntity) {
238         return addCallBackForDeleteFlowAndReturn(txRunner
239                 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
240                     tx -> deleteFlowEntityInternal(flowEntity, tx)));
241     }
242
243     private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
244         Uint64 dpId = flowEntity.getDpnId();
245         short tableId = flowEntity.getTableId();
246         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
247         deleteFlow(dpId, tableId, flowKey, tx);
248     }
249
250     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
251                             TypedWriteTransaction<Datastore.Configuration> tx) {
252         if (flowExists(dpId, tableId, flowKey)) {
253             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
254             tx.delete(flowInstanceId);
255         } else {
256             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
257         }
258     }
259
260     private FluentFuture<?> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
261         LOG.debug("Remove flow {}", flowEntity);
262         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
263             tx -> {
264                 FlowKey flowKey = new FlowKey(flowEntity.getId());
265                 short tableId = flowEntity.getTableId().toJava();
266                 deleteFlow(dpnId, tableId, flowKey, tx);
267             });
268     }
269
270     @VisibleForTesting
271     FluentFuture<?> removeGroupInternal(Uint64 dpnId, long groupId) {
272         return addCallBackForInstallGroupAndReturn(txRunner
273             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
274                 tx -> removeGroupInternal(dpnId, groupId, tx)));
275     }
276
277     private void removeGroupInternal(Uint64 dpnId, long groupId,
278                                      TypedWriteTransaction<Datastore.Configuration> tx) {
279         Node nodeDpn = buildDpnNode(dpnId);
280         if (groupExists(nodeDpn, groupId)) {
281             InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
282             tx.delete(groupInstanceId);
283         } else {
284             LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
285         }
286     }
287
288     private static Node buildDpnNode(Uint64 dpnId) {
289         NodeId nodeId = new NodeId("openflow:" + dpnId);
290         Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
291
292         return nodeDpn;
293     }
294
295     private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
296         if (LOG.isTraceEnabled()) {
297             LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
298         }
299         Flow flow = flowEntity.getFlowBuilder().build();
300         String flowId = flowEntity.getFlowId();
301         short tableId = flowEntity.getTableId();
302         Uint64 dpId = flowEntity.getDpnId();
303         FlowKey flowKey = new FlowKey(new FlowId(flowId));
304         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
305
306         if (isRemove) {
307             try (Acquired lock = FLOW_LOCKS.acquire(new FlowLockKey(dpId, tableId, flowKey))) {
308                 if (flowExists(dpId, tableId, flowKey)) {
309                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
310                 } else {
311                     LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
312                 }
313             }
314         } else {
315             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
316         }
317     }
318
319     private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
320         if (LOG.isTraceEnabled()) {
321             LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
322         }
323         Group group = groupEntity.getGroupBuilder().build();
324         Uint64 dpId = groupEntity.getDpnId();
325         long groupId = groupEntity.getGroupId();
326         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
327
328         if (isRemove) {
329             try (Acquired lock = GROUP_LOCKS.acquire(new GroupLockKey(groupId, dpId))) {
330                 if (groupExists(dpId, groupId)) {
331                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
332                 } else {
333                     LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
334                 }
335             }
336         } else {
337             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
338         }
339     }
340
341     private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
342
343         GroupListener() {
344             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
345                     .augmentation(FlowCapableNode.class).child(Group.class),
346                     Executors.newSingleThreadExecutor("GroupListener", LOG));
347         }
348
349         @Override
350         public void remove(InstanceIdentifier<Group> identifier, Group del) {
351             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
352             executeNotifyTaskIfRequired(dpId, del);
353         }
354
355         private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
356             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
357             Runnable notifyTask = groupMap.remove(groupKey);
358             if (notifyTask == null) {
359                 return;
360             }
361             executorService.execute(notifyTask);
362         }
363
364         @Override
365         public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
366             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
367             executeNotifyTaskIfRequired(dpId, update);
368         }
369
370         @Override
371         public void add(InstanceIdentifier<Group> identifier, Group add) {
372             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
373             executeNotifyTaskIfRequired(dpId, add);
374         }
375     }
376
377     private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
378
379         FlowListener() {
380             super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
381                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
382                     Executors.newSingleThreadExecutor("FlowListener", LOG));
383         }
384
385         @Override
386         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
387             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
388             notifyTaskIfRequired(dpId, del);
389         }
390
391         private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
392             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
393                                                   flow.getMatch(), flow.getId().getValue());
394             Runnable notifyTask = flowMap.remove(flowKey);
395             if (notifyTask == null) {
396                 return;
397             }
398             executorService.execute(notifyTask);
399         }
400
401         @Override
402         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
403         }
404
405         @Override
406         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
407             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
408             notifyTaskIfRequired(dpId, add);
409         }
410
411     }
412
413     private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
414         private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
415
416         FlowConfigListener() {
417             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
418                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
419                     Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
420         }
421
422         @Override
423         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
424             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
425             flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
426                 del.getId().getValue(), del.getTableId(), dpId);
427         }
428
429         @Override
430         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
431         }
432
433         @Override
434         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
435             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
436             flowLog.debug("FlowId {} added to Table {} on DPN {}",
437                 add.getId().getValue(), add.getTableId(), dpId);
438         }
439     }
440
441     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
442             justification = "https://github.com/spotbugs/spotbugs/issues/811")
443     private static Uint64 getDpnFromString(String dpnString) {
444         String[] split = dpnString.split(":");
445         return Uint64.valueOf(split[1]);
446     }
447
448     @Override
449     public FluentFuture<?> installFlow(FlowEntity flowEntity) {
450         return installFlowInternal(flowEntity);
451     }
452
453     @Override
454     public FluentFuture<?> installFlow(Uint64 dpId, Flow flowEntity) {
455         return installFlowInternal(dpId, flowEntity);
456     }
457
458     @Override
459     public FluentFuture<?> installFlow(Uint64 dpId, FlowEntity flowEntity) {
460         return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
461     }
462
463     @Override
464     public ListenableFuture<?> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
465         ListenableFuture<?> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
466             tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
467
468         Futures.addCallback(future, new FutureCallback<Object>() {
469             @Override
470             public void onSuccess(final Object result) {
471                 // Committed successfully
472                 LOG.debug("Delete Flow -- Committed successfully");
473             }
474
475             @Override
476             public void onFailure(final Throwable throwable) {
477                 // Transaction failed
478                 if (throwable instanceof OptimisticLockFailedException) {
479                     // Failed because of concurrent transaction modifying same
480                     // data
481                     LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
482                 } else {
483                     // Some other type of TransactionCommitFailedException
484                     LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
485                 }
486             }
487
488         }, MoreExecutors.directExecutor());
489
490         return future;
491     }
492
493     @Override
494     public FluentFuture<?> removeFlow(Uint64 dpId, Flow flowEntity) {
495         return removeFlowNewInternal(dpId, flowEntity);
496     }
497
498     @Override
499     public FluentFuture<?> removeFlow(FlowEntity flowEntity) {
500         return removeFlowInternal(flowEntity);
501     }
502
503     @Override
504     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
505             throws ExecutionException, InterruptedException {
506         removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
507     }
508
509     @Override
510     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
511             throws ExecutionException, InterruptedException {
512         removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
513     }
514
515     @Override
516     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
517             throws ExecutionException, InterruptedException {
518         removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
519     }
520
521     @Override
522     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
523             short tableId) throws ExecutionException, InterruptedException {
524         InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
525         if (tx.read(flowInstanceIdentifier).get().isPresent()) {
526             tx.delete(flowInstanceIdentifier);
527         }
528     }
529
530     @Override
531     public void removeGroup(GroupEntity groupEntity) {
532         removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
533     }
534
535     @Override
536     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
537             throws ExecutionException, InterruptedException {
538         removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
539     }
540
541     @Override
542     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
543             throws ExecutionException, InterruptedException {
544         removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
545     }
546
547     @Override
548     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
549             throws ExecutionException, InterruptedException {
550         Node nodeDpn = buildDpnNode(dpId);
551         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
552         if (tx.read(groupInstanceId).get().isPresent()) {
553             tx.delete(groupInstanceId);
554         } else {
555             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
556         }
557     }
558
559     @Override
560     public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
561         syncSetUpFlowInternal(flowEntity, true);
562     }
563
564     @Override
565     public void syncRemoveFlow(FlowEntity flowEntity) {
566         syncSetUpFlowInternal(flowEntity, true);
567     }
568
569     @Override
570     public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
571         syncSetUpFlowInternal(flowEntity, false);
572     }
573
574     @Override
575     public void syncInstallFlow(FlowEntity flowEntity) {
576         syncSetUpFlowInternal(flowEntity, false);
577     }
578
579     @Override
580     public void syncInstallGroup(GroupEntity groupEntity) {
581         syncSetUpGroupInternal(groupEntity, false);
582     }
583
584     @Override
585     public void syncRemoveGroup(GroupEntity groupEntity) {
586         syncSetUpGroupInternal(groupEntity, true);
587     }
588
589     @Override
590     public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
591         addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
592     }
593
594     @Override
595     public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
596         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
597                                                             flow.getTableId().toJava(), flow.key());
598         tx.mergeParentStructurePut(flowInstanceId, flow);
599     }
600
601     @Override
602     public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
603         addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
604     }
605
606     @Override
607     public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
608         Node nodeDpn = buildDpnNode(dpId);
609         long groupId = group.getGroupId().getValue().toJava();
610         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
611         tx.mergeParentStructurePut(groupInstanceId, group);
612     }
613
614     @Override
615     public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
616             throws ExecutionException, InterruptedException {
617         Node nodeDpn = buildDpnNode(dpId);
618         if (groupExists(tx, nodeDpn, groupId)) {
619             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
620                 bucket.getBucketId().getValue().toJava(), nodeDpn);
621             tx.put(bucketInstanceId, bucket);
622         }
623     }
624
625     @Override
626     public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
627             throws ExecutionException, InterruptedException {
628         Node nodeDpn = buildDpnNode(dpId);
629         if (groupExists(tx, nodeDpn, groupId)) {
630             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
631             tx.delete(bucketInstanceId);
632         } else {
633             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
634         }
635     }
636
637     @Override
638     public boolean groupExists(Uint64 dpId, long groupId) {
639         return groupExists(buildDpnNode(dpId), groupId);
640     }
641
642     private boolean groupExists(Node nodeDpn, long groupId) {
643         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
644         try {
645             return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
646         } catch (ExecutionException | InterruptedException e) {
647             LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
648         }
649         return false;
650     }
651
652     private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
653            throws ExecutionException, InterruptedException {
654         return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
655     }
656
657     private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
658         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
659                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
660                 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
661         return groupInstanceId;
662     }
663
664     private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
665         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
666         try {
667             Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
668                     flowInstanceId);
669             return flowOptional.isPresent();
670         } catch (ExecutionException | InterruptedException e) {
671             LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
672         }
673         return false;
674     }
675
676     private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
677             FlowKey flowKey) {
678         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
679                 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
680                 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
681         return flowInstanceId;
682     }
683
684     private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
685             Node nodeDpn) {
686         InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
687                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
688                 .child(Group.class, new GroupKey(new GroupId(groupId)))
689                 .child(Buckets.class)
690                 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
691         return bucketInstanceId;
692     }
693
694     private static FluentFuture<?> addCallBackForDeleteFlowAndReturn(FluentFuture<?> fluentFuture) {
695         return callBack(fluentFuture, "Delete Flow");
696     }
697
698     private static FluentFuture<?> addCallBackForInstallFlowAndReturn(FluentFuture<?> fluentFuture) {
699         return callBack(fluentFuture, "Install Flow");
700     }
701
702     private static FluentFuture<?> addCallBackForInstallGroupAndReturn(FluentFuture<?> fluentFuture) {
703         return callBack(fluentFuture, "Install Group");
704     }
705
706     // Generic for handling callbacks
707     private static FluentFuture<?> callBack(FluentFuture<?> fluentFuture, String log) {
708         fluentFuture.addCallback(new FutureCallback<Object>() {
709             @Override
710             public void onSuccess(final Object result) {
711                 // Committed successfully
712                 LOG.debug("{} -- Committedsuccessfully ", log);
713             }
714
715             @Override
716             public void onFailure(final Throwable throwable) {
717                 // Transaction failed
718
719                 if (throwable instanceof OptimisticLockFailedException) {
720                     // Failed because of concurrent transaction modifying same
721                     // data
722                     LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
723                 } else {
724                     // Some other type of TransactionCommitFailedException
725                     LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
726                 }
727             }
728         }, MoreExecutors.directExecutor());
729         return fluentFuture;
730     }
731 }