d7fab55ed6d6cc826b7794805626c329ad6a440d
[genius.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / genius / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.mdsalutil.internal;
10
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
12 import static org.opendaylight.mdsal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FluentFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
29 import org.opendaylight.genius.infra.Datastore;
30 import org.opendaylight.genius.infra.Datastore.Configuration;
31 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.TypedReadTransaction;
33 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
34 import org.opendaylight.genius.infra.TypedWriteTransaction;
35 import org.opendaylight.genius.mdsalutil.FlowEntity;
36 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
37 import org.opendaylight.genius.mdsalutil.GroupEntity;
38 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
39 import org.opendaylight.genius.mdsalutil.MDSALUtil;
40 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
41 import org.opendaylight.infrautils.inject.AbstractLifecycle;
42 import org.opendaylight.infrautils.utils.concurrent.Executors;
43 import org.opendaylight.mdsal.binding.api.DataBroker;
44 import org.opendaylight.mdsal.binding.api.WriteTransaction;
45 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
46 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
47 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.Uint64;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 @Singleton
74 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
75 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
76
77     private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
78
79     private final DataBroker dataBroker;
80     private final RetryingManagedNewTransactionRunner txRunner;
81     private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
82
83     private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
84     private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
85     private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
86     private final SingleTransactionDataBroker singleTxDb;
87     private final FlowListener flowListener = new FlowListener();
88     private final FlowConfigListener flowConfigListener = new FlowConfigListener();
89     private final GroupListener groupListener = new GroupListener();
90
91     /**
92      * Writes the flows and Groups to the MD SAL DataStore which will be sent to
93      * the openflowplugin for installing flows/groups on the switch. Other
94      * modules of VPN service that wants to install flows / groups on the switch
95      * uses this utility
96      *
97      * @param db
98      *            dataBroker reference
99      * @param pktProcService
100      *            PacketProcessingService for sending the packet outs
101      */
102     @Deprecated
103     public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
104         this(db);
105     }
106
107     @Inject
108     public MDSALManager(DataBroker db) {
109         this.dataBroker = db;
110         this.txRunner = new RetryingManagedNewTransactionRunner(db);
111         singleTxDb = new SingleTransactionDataBroker(dataBroker);
112         LOG.info("MDSAL Manager Initialized ");
113     }
114
115     @Override
116     protected void start() {
117         LOG.info("{} start", getClass().getSimpleName());
118
119         int batchSize = Integer.getInteger("batch.size", 1000);
120         int batchInterval = Integer.getInteger("batch.wait.time", 500);
121
122         flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
123     }
124
125     @Override
126     protected void stop() {
127         LOG.info("{} stop", getClass().getSimpleName());
128
129         flowListener.close();
130         flowConfigListener.close();
131         groupListener.close();
132     }
133
134     @VisibleForTesting
135     FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
136         return addCallBackForInstallFlowAndReturn(txRunner
137             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
138                 tx -> writeFlowEntityInternal(flowEntity, tx)));
139     }
140
141     private FluentFuture<Void> installFlowInternal(Uint64 dpId, Flow flow) {
142         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
143             tx -> writeFlowInternal(dpId, flow, tx));
144     }
145
146     private static void writeFlowEntityInternal(FlowEntity flowEntity,
147             TypedWriteTransaction<Datastore.Configuration> tx) {
148         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
149         FlowBuilder flowbld = flowEntity.getFlowBuilder();
150         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
151                 flowEntity.getTableId(), flowKey);
152         tx.put(flowInstanceId, flowbld.build(), true);
153     }
154
155     private static void writeFlowInternal(Uint64 dpId, Flow flow,
156             TypedWriteTransaction<Datastore.Configuration> tx) {
157         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
158         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
159                                                             flow.getTableId().toJava(), flowKey);
160         tx.put(flowInstanceId, flow, true);
161     }
162
163     @VisibleForTesting
164     FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
165         return addCallBackForInstallGroupAndReturn(txRunner
166             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
167                 tx -> writeGroupEntityInternal(groupEntity, tx)));
168     }
169
170     private static void writeGroupEntityInternal(GroupEntity groupEntity,
171             TypedWriteTransaction<Datastore.Configuration> tx) {
172         Group group = groupEntity.getGroupBuilder().build();
173         Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
174         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
175         tx.put(groupInstanceId, group, true);
176     }
177
178     @VisibleForTesting
179     FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
180         return addCallBackForDeleteFlowAndReturn(txRunner
181                 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
182                     tx -> deleteFlowEntityInternal(flowEntity, tx)));
183     }
184
185     private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
186         Uint64 dpId = flowEntity.getDpnId();
187         short tableId = flowEntity.getTableId();
188         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
189         deleteFlow(dpId, tableId, flowKey, tx);
190     }
191
192     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
193         if (flowExists(dpId, tableId, flowKey)) {
194             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
195             tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
196         } else {
197             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
198         }
199     }
200
201     private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
202                             TypedWriteTransaction<Datastore.Configuration> tx) {
203         if (flowExists(dpId, tableId, flowKey)) {
204             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
205             tx.delete(flowInstanceId);
206         } else {
207             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
208         }
209     }
210
211     private FluentFuture<Void> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
212         LOG.debug("Remove flow {}", flowEntity);
213         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
214             tx -> {
215                 FlowKey flowKey = new FlowKey(flowEntity.getId());
216                 short tableId = flowEntity.getTableId().toJava();
217                 deleteFlow(dpnId, tableId, flowKey, tx);
218             });
219     }
220
221     @VisibleForTesting
222     FluentFuture<Void> removeGroupInternal(Uint64 dpnId, long groupId) {
223         return addCallBackForInstallGroupAndReturn(txRunner
224             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
225                 tx -> removeGroupInternal(dpnId, groupId, tx)));
226     }
227
228     private void removeGroupInternal(Uint64 dpnId, long groupId,
229                                      TypedWriteTransaction<Datastore.Configuration> tx) {
230         Node nodeDpn = buildDpnNode(dpnId);
231         if (groupExists(nodeDpn, groupId)) {
232             InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
233             tx.delete(groupInstanceId);
234         } else {
235             LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
236         }
237     }
238
239     private static Node buildDpnNode(Uint64 dpnId) {
240         NodeId nodeId = new NodeId("openflow:" + dpnId);
241         Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
242
243         return nodeDpn;
244     }
245
246     private static String getGroupKey(long groupId, Uint64 dpId) {
247         String synchronizingKey = "group-key-" + groupId + dpId;
248         return synchronizingKey.intern();
249     }
250
251     private static String getFlowKey(Uint64 dpId, short tableId, FlowKey flowKey) {
252         String synchronizingKey = "flow-key-" + dpId + tableId + flowKey;
253         return synchronizingKey.intern();
254     }
255
256     private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
257         if (LOG.isTraceEnabled()) {
258             LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
259         }
260         Flow flow = flowEntity.getFlowBuilder().build();
261         String flowId = flowEntity.getFlowId();
262         short tableId = flowEntity.getTableId();
263         Uint64 dpId = flowEntity.getDpnId();
264         FlowKey flowKey = new FlowKey(new FlowId(flowId));
265         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
266
267         if (isRemove) {
268             synchronized (getFlowKey(dpId, tableId, flowKey)) {
269                 if (flowExists(dpId, tableId, flowKey)) {
270                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
271                 } else {
272                     LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
273                 }
274             }
275         } else {
276             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
277         }
278     }
279
280     private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
281         if (LOG.isTraceEnabled()) {
282             LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
283         }
284         Group group = groupEntity.getGroupBuilder().build();
285         Uint64 dpId = groupEntity.getDpnId();
286         long groupId = groupEntity.getGroupId();
287         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
288
289         if (isRemove) {
290             synchronized (getGroupKey(groupId, dpId)) {
291                 if (groupExists(dpId, groupId)) {
292                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
293                 } else {
294                     LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
295                 }
296             }
297         } else {
298             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
299         }
300     }
301
302     private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
303
304         GroupListener() {
305             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
306                     .augmentation(FlowCapableNode.class).child(Group.class),
307                     Executors.newSingleThreadExecutor("GroupListener", LOG));
308         }
309
310         @Override
311         public void remove(InstanceIdentifier<Group> identifier, Group del) {
312             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
313             executeNotifyTaskIfRequired(dpId, del);
314         }
315
316         private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
317             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
318             Runnable notifyTask = groupMap.remove(groupKey);
319             if (notifyTask == null) {
320                 return;
321             }
322             executorService.execute(notifyTask);
323         }
324
325         @Override
326         public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
327             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
328             executeNotifyTaskIfRequired(dpId, update);
329         }
330
331         @Override
332         public void add(InstanceIdentifier<Group> identifier, Group add) {
333             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
334             executeNotifyTaskIfRequired(dpId, add);
335         }
336     }
337
338     private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
339
340         FlowListener() {
341             super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
342                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
343                     Executors.newSingleThreadExecutor("FlowListener", LOG));
344         }
345
346         @Override
347         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
348             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
349             notifyTaskIfRequired(dpId, del);
350         }
351
352         private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
353             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
354                                                   flow.getMatch(), flow.getId().getValue());
355             Runnable notifyTask = flowMap.remove(flowKey);
356             if (notifyTask == null) {
357                 return;
358             }
359             executorService.execute(notifyTask);
360         }
361
362         @Override
363         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
364         }
365
366         @Override
367         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
368             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
369             notifyTaskIfRequired(dpId, add);
370         }
371
372     }
373
374     private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
375         private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
376
377         FlowConfigListener() {
378             super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
379                     .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
380                     Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
381         }
382
383         @Override
384         public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
385             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
386             flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
387                 del.getId().getValue(), del.getTableId(), dpId);
388         }
389
390         @Override
391         public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
392         }
393
394         @Override
395         public void add(InstanceIdentifier<Flow> identifier, Flow add) {
396             Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
397             flowLog.debug("FlowId {} added to Table {} on DPN {}",
398                 add.getId().getValue(), add.getTableId(), dpId);
399         }
400     }
401
402     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
403             justification = "https://github.com/spotbugs/spotbugs/issues/811")
404     private static Uint64 getDpnFromString(String dpnString) {
405         String[] split = dpnString.split(":");
406         return Uint64.valueOf(split[1]);
407     }
408
409     @Override
410     public FluentFuture<Void> installFlow(FlowEntity flowEntity) {
411         return installFlowInternal(flowEntity);
412     }
413
414     @Override
415     public FluentFuture<Void> installFlow(Uint64 dpId, Flow flowEntity) {
416         return installFlowInternal(dpId, flowEntity);
417     }
418
419     @Override
420     public FluentFuture<Void> installFlow(Uint64 dpId, FlowEntity flowEntity) {
421         return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
422     }
423
424     @Override
425     public ListenableFuture<Void> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
426         ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
427             tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
428
429         Futures.addCallback(future, new FutureCallback<Void>() {
430             @Override
431             public void onSuccess(final Void result) {
432                 // Committed successfully
433                 LOG.debug("Delete Flow -- Committed successfully");
434             }
435
436             @Override
437             public void onFailure(final Throwable throwable) {
438                 // Transaction failed
439                 if (throwable instanceof OptimisticLockFailedException) {
440                     // Failed because of concurrent transaction modifying same
441                     // data
442                     LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
443                 } else {
444                     // Some other type of TransactionCommitFailedException
445                     LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
446                 }
447             }
448
449         }, MoreExecutors.directExecutor());
450
451         return future;
452     }
453
454     @Override
455     public FluentFuture<Void> removeFlow(Uint64 dpId, Flow flowEntity) {
456         return removeFlowNewInternal(dpId, flowEntity);
457     }
458
459     @Override
460     public FluentFuture<Void> removeFlow(FlowEntity flowEntity) {
461         return removeFlowInternal(flowEntity);
462     }
463
464     @Override
465     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
466             throws ExecutionException, InterruptedException {
467         removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
468     }
469
470     @Override
471     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
472             throws ExecutionException, InterruptedException {
473         removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
474     }
475
476     @Override
477     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
478             throws ExecutionException, InterruptedException {
479         removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
480     }
481
482     @Override
483     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
484             short tableId) throws ExecutionException, InterruptedException {
485         InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
486         if (tx.read(flowInstanceIdentifier).get().isPresent()) {
487             tx.delete(flowInstanceIdentifier);
488         }
489     }
490
491     @Override
492     public void removeGroup(GroupEntity groupEntity) {
493         removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
494     }
495
496     @Override
497     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
498             throws ExecutionException, InterruptedException {
499         removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
500     }
501
502     @Override
503     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
504             throws ExecutionException, InterruptedException {
505         removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
506     }
507
508     @Override
509     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
510             throws ExecutionException, InterruptedException {
511         Node nodeDpn = buildDpnNode(dpId);
512         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
513         if (tx.read(groupInstanceId).get().isPresent()) {
514             tx.delete(groupInstanceId);
515         } else {
516             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
517         }
518     }
519
520     @Override
521     public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
522         syncSetUpFlowInternal(flowEntity, true);
523     }
524
525     @Override
526     public void syncRemoveFlow(FlowEntity flowEntity) {
527         syncSetUpFlowInternal(flowEntity, true);
528     }
529
530     @Override
531     public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
532         syncSetUpFlowInternal(flowEntity, false);
533     }
534
535     @Override
536     public void syncInstallFlow(FlowEntity flowEntity) {
537         syncSetUpFlowInternal(flowEntity, false);
538     }
539
540     @Override
541     public void syncInstallGroup(GroupEntity groupEntity) {
542         syncSetUpGroupInternal(groupEntity, false);
543     }
544
545     @Override
546     public void syncRemoveGroup(GroupEntity groupEntity) {
547         syncSetUpGroupInternal(groupEntity, true);
548     }
549
550     @Override
551     public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
552         addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
553     }
554
555     @Override
556     public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
557         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
558                                                             flow.getTableId().toJava(), flow.key());
559         tx.put(flowInstanceId, flow, CREATE_MISSING_PARENTS);
560     }
561
562     @Override
563     public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
564         addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
565     }
566
567     @Override
568     public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
569         Node nodeDpn = buildDpnNode(dpId);
570         long groupId = group.getGroupId().getValue().toJava();
571         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
572         tx.put(groupInstanceId, group, CREATE_MISSING_PARENTS);
573     }
574
575     @Override
576     public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
577             throws ExecutionException, InterruptedException {
578         Node nodeDpn = buildDpnNode(dpId);
579         if (groupExists(tx, nodeDpn, groupId)) {
580             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
581                 bucket.getBucketId().getValue().toJava(), nodeDpn);
582             tx.put(bucketInstanceId, bucket);
583         }
584     }
585
586     @Override
587     public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
588             throws ExecutionException, InterruptedException {
589         Node nodeDpn = buildDpnNode(dpId);
590         if (groupExists(tx, nodeDpn, groupId)) {
591             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
592             tx.delete(bucketInstanceId);
593         } else {
594             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
595         }
596     }
597
598     @Override
599     public boolean groupExists(Uint64 dpId, long groupId) {
600         return groupExists(buildDpnNode(dpId), groupId);
601     }
602
603     private boolean groupExists(Node nodeDpn, long groupId) {
604         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
605         try {
606             return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
607         } catch (ExecutionException | InterruptedException e) {
608             LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
609         }
610         return false;
611     }
612
613     private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
614            throws ExecutionException, InterruptedException {
615         return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
616     }
617
618     private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
619         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
620                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
621                 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
622         return groupInstanceId;
623     }
624
625     private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
626         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
627         try {
628             Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
629                     flowInstanceId);
630             return flowOptional.isPresent();
631         } catch (ExecutionException | InterruptedException e) {
632             LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
633         }
634         return false;
635     }
636
637     private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
638             FlowKey flowKey) {
639         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
640                 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
641                 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
642         return flowInstanceId;
643     }
644
645     private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
646             Node nodeDpn) {
647         InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
648                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
649                 .child(Group.class, new GroupKey(new GroupId(groupId)))
650                 .child(Buckets.class)
651                 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
652         return bucketInstanceId;
653     }
654
655     private static FluentFuture<Void> addCallBackForDeleteFlowAndReturn(FluentFuture<Void> fluentFuture) {
656         return callBack(fluentFuture, "Delete Flow");
657     }
658
659     private static FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
660         return callBack(fluentFuture, "Install Flow");
661     }
662
663     private static FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
664         return callBack(fluentFuture, "Install Group");
665     }
666
667     // Generic for handling callbacks
668     private static FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
669         fluentFuture.addCallback(new FutureCallback<Void>() {
670             @Override
671             public void onSuccess(final Void result) {
672                 // Committed successfully
673                 LOG.debug("{} -- Committedsuccessfully ", log);
674             }
675
676             @Override
677             public void onFailure(final Throwable throwable) {
678                 // Transaction failed
679
680                 if (throwable instanceof OptimisticLockFailedException) {
681                     // Failed because of concurrent transaction modifying same
682                     // data
683                     LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
684                 } else {
685                     // Some other type of TransactionCommitFailedException
686                     LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
687                 }
688             }
689         }, MoreExecutors.directExecutor());
690         return fluentFuture;
691     }
692 }