f993902f828c31f44129e9c6f53601f10810ea0f
[genius.git] / mdsalutil / mdsalutil-impl / src / main / java / org / opendaylight / genius / mdsalutil / internal / MDSALManager.java
1 /*
2  * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.genius.mdsalutil.internal;
10
11 import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
12 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
13 import static org.opendaylight.infrautils.utils.concurrent.FluentFutures2.toChecked;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FluentFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import java.math.BigInteger;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
36 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
37 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
38 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
39 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
40 import org.opendaylight.genius.infra.Datastore;
41 import org.opendaylight.genius.infra.Datastore.Configuration;
42 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
43 import org.opendaylight.genius.infra.TypedReadTransaction;
44 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
45 import org.opendaylight.genius.infra.TypedWriteTransaction;
46 import org.opendaylight.genius.mdsalutil.ActionInfo;
47 import org.opendaylight.genius.mdsalutil.FlowEntity;
48 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
49 import org.opendaylight.genius.mdsalutil.GroupEntity;
50 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
51 import org.opendaylight.genius.mdsalutil.MDSALUtil;
52 import org.opendaylight.genius.mdsalutil.actions.ActionGroup;
53 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
54 import org.opendaylight.infrautils.inject.AbstractLifecycle;
55 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketOutput;
81 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
82 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
86
87 @Singleton
88 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
89 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
90
91     private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
92
93     private final DataBroker dataBroker;
94     private final RetryingManagedNewTransactionRunner txRunner;
95     private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
96
97     private final PacketProcessingService packetProcessingService;
98     private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
99     private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
100     private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
101     private final SingleTransactionDataBroker singleTxDb;
102     private final FlowListener flowListener = new FlowListener();
103     private final FlowConfigListener flowConfigListener = new FlowConfigListener();
104     private final GroupListener groupListener = new GroupListener();
105
106     /**
107      * Writes the flows and Groups to the MD SAL DataStore which will be sent to
108      * the openflowplugin for installing flows/groups on the switch. Other
109      * modules of VPN service that wants to install flows / groups on the switch
110      * uses this utility
111      *
112      * @param db
113      *            dataBroker reference
114      * @param pktProcService
115      *            PacketProcessingService for sending the packet outs
116      */
117     @Inject
118     public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
119         this.dataBroker = db;
120         this.txRunner = new RetryingManagedNewTransactionRunner(db);
121         this.packetProcessingService = pktProcService;
122         singleTxDb = new SingleTransactionDataBroker(dataBroker);
123         LOG.info("MDSAL Manager Initialized ");
124     }
125
126     @Override
127     protected void start() throws Exception {
128         LOG.info("{} start", getClass().getSimpleName());
129
130         int batchSize = Integer.getInteger("batch.size", 1000);
131         int batchInterval = Integer.getInteger("batch.wait.time", 500);
132
133         flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
134         flowListener.registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
135         flowConfigListener.registerListener(LogicalDatastoreType.CONFIGURATION, dataBroker);
136         groupListener.registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
137     }
138
139     @Override
140     protected void stop() throws Exception {
141         LOG.info("{} stop", getClass().getSimpleName());
142
143         flowListener.close();
144         flowConfigListener.close();
145         groupListener.close();
146     }
147
148     @VisibleForTesting
149     FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
150         return addCallBackForInstallFlowAndReturn(txRunner
151             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
152                 tx -> writeFlowEntityInternal(flowEntity, tx)));
153     }
154
155     private FluentFuture<Void> installFlowInternal(BigInteger dpId, Flow flow) {
156         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
157             tx -> writeFlowInternal(dpId, flow, tx));
158     }
159
160     private void writeFlowEntityInternal(FlowEntity flowEntity, WriteTransaction tx) {
161         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
162         FlowBuilder flowbld = flowEntity.getFlowBuilder();
163         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
164                 flowEntity.getTableId(), flowKey);
165         tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flowbld.build(), true);
166     }
167
168     private void writeFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
169         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
170         FlowBuilder flowbld = flowEntity.getFlowBuilder();
171         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
172                 flowEntity.getTableId(), flowKey);
173         tx.put(flowInstanceId, flowbld.build(), true);
174     }
175
176     private void writeFlowInternal(BigInteger dpId, Flow flow, WriteTransaction tx) {
177         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
178         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
179         tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow, true);
180     }
181
182     private void writeFlowInternal(BigInteger dpId, Flow flow, TypedWriteTransaction<Datastore.Configuration> tx) {
183         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
184         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
185         tx.put(flowInstanceId, flow, true);
186     }
187
188     private void batchedAddFlowInternal(BigInteger dpId, Flow flow) {
189         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
190         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
191         flowBatchingUtils.write(flowInstanceId, flow);
192     }
193
194     private void batchedRemoveFlowInternal(BigInteger dpId, Flow flow) {
195         FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
196         short tableId = flow.getTableId();
197         if (flowExists(dpId, tableId, flowKey)) {
198             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
199             flowBatchingUtils.delete(flowInstanceId);
200         } else {
201             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
202         }
203     }
204
205     @VisibleForTesting
206     FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
207         return addCallBackForInstallGroupAndReturn(txRunner
208             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
209                 tx -> writeGroupEntityInternal(groupEntity, tx)));
210     }
211
212     private void writeGroupEntityInternal(GroupEntity groupEntity, WriteTransaction tx) {
213         Group group = groupEntity.getGroupBuilder().build();
214         Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
215         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
216         tx.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
217     }
218
219     private void writeGroupEntityInternal(GroupEntity groupEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
220         Group group = groupEntity.getGroupBuilder().build();
221         Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
222         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
223         tx.put(groupInstanceId, group, true);
224     }
225
226     private void writeGroupInternal(BigInteger dpId, Group group, WriteTransaction tx) {
227         Node nodeDpn = buildDpnNode(dpId);
228         long groupId = group.getGroupId().getValue();
229         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
230         tx.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
231     }
232
233     @VisibleForTesting
234     FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
235         return addCallBackForDeleteFlowAndReturnm(txRunner
236                 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
237                     tx -> deleteFlowEntityInternal(flowEntity, tx)));
238     }
239
240     private void deleteFlowEntityInternal(FlowEntity flowEntity, WriteTransaction tx) {
241         BigInteger dpId = flowEntity.getDpnId();
242         short tableId = flowEntity.getTableId();
243         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
244         deleteFlow(dpId, tableId, flowKey, tx);
245     }
246
247     private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
248         BigInteger dpId = flowEntity.getDpnId();
249         short tableId = flowEntity.getTableId();
250         FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
251         deleteFlow(dpId, tableId, flowKey, tx);
252     }
253
254     private void deleteFlow(BigInteger dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
255         if (flowExists(dpId, tableId, flowKey)) {
256             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
257             tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
258         } else {
259             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
260         }
261     }
262
263     private void deleteFlow(BigInteger dpId, short tableId, FlowKey flowKey,
264                             TypedWriteTransaction<Datastore.Configuration> tx) {
265         if (flowExists(dpId, tableId, flowKey)) {
266             InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
267             tx.delete(flowInstanceId);
268         } else {
269             LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
270         }
271     }
272
273     private FluentFuture<Void> removeFlowNewInternal(BigInteger dpnId, Flow flowEntity) {
274         LOG.debug("Remove flow {}", flowEntity);
275         return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
276             tx -> {
277                 FlowKey flowKey = new FlowKey(flowEntity.getId());
278                 short tableId = flowEntity.getTableId();
279                 deleteFlow(dpnId, tableId, flowKey, tx);
280             });
281     }
282
283     private void deleteFlowInternal(BigInteger dpId, Flow flow, WriteTransaction tx) {
284         FlowKey flowKey = new FlowKey(flow.getId());
285         short tableId = flow.getTableId();
286         deleteFlow(dpId, tableId, flowKey, tx);
287     }
288
289     @VisibleForTesting
290     FluentFuture<Void> removeGroupInternal(BigInteger dpnId, long groupId) {
291         return addCallBackForInstallGroupAndReturn(txRunner
292             .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
293                 tx -> removeGroupInternal(dpnId, groupId, tx)));
294     }
295
296     private void removeGroupInternal(BigInteger dpnId, long groupId, WriteTransaction tx) {
297         Node nodeDpn = buildDpnNode(dpnId);
298         if (groupExists(nodeDpn, groupId)) {
299             InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
300             tx.delete(LogicalDatastoreType.CONFIGURATION, groupInstanceId);
301         } else {
302             LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
303         }
304     }
305
306     private void removeGroupInternal(BigInteger dpnId, long groupId,
307                                      TypedWriteTransaction<Datastore.Configuration> tx) {
308         Node nodeDpn = buildDpnNode(dpnId);
309         if (groupExists(nodeDpn, groupId)) {
310             InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
311             tx.delete(groupInstanceId);
312         } else {
313             LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
314         }
315     }
316
317
318     private void modifyGroupInternal(GroupEntity groupEntity) {
319
320         installGroup(groupEntity);
321     }
322
323     private void sendPacketOutInternal(BigInteger dpnId, int groupId, byte[] payload) {
324
325         List<ActionInfo> actionInfos = new ArrayList<>();
326         actionInfos.add(new ActionGroup(groupId));
327
328         sendPacketOutWithActions(dpnId, payload, actionInfos);
329     }
330
331     private void sendPacketOutWithActionsInternal(BigInteger dpnId, byte[] payload, List<ActionInfo> actionInfos) {
332         ListenableFuture<RpcResult<TransmitPacketOutput>> future = packetProcessingService.transmitPacket(
333                 MDSALUtil.getPacketOut(actionInfos, payload, dpnId,
334                         getNodeConnRef("openflow:" + dpnId, "0xfffffffd")));
335         JdkFutures.addErrorLogging(future, LOG, "Transmit packet");
336     }
337
338     private void sendARPPacketOutWithActionsInternal(BigInteger dpnId, byte[] payload, List<ActionInfo> actions) {
339         sendPacketOutWithActionsInternal(dpnId, payload, actions);
340     }
341
342     protected InstanceIdentifier<Node> nodeToInstanceId(Node node) {
343         return InstanceIdentifier.builder(Nodes.class).child(Node.class, node.key()).build();
344     }
345
346     private static NodeConnectorRef getNodeConnRef(final String nodeId, final String port) {
347         StringBuilder stringBuilder = new StringBuilder(nodeId);
348         StringBuilder append = stringBuilder.append(":");
349         StringBuilder build = append.append(port);
350         String string = build.toString();
351         NodeConnectorId nodeConnectorId = new NodeConnectorId(string);
352         NodeConnectorKey nodeConnectorKey = new NodeConnectorKey(nodeConnectorId);
353         NodeConnectorKey connectorKey = nodeConnectorKey;
354         InstanceIdentifierBuilder<Nodes> builder = InstanceIdentifier.builder(Nodes.class);
355
356         NodeKey nodeKey = new NodeKey(new NodeId(nodeId));
357         InstanceIdentifierBuilder<Node> child = builder.child(Node.class, nodeKey);
358         InstanceIdentifierBuilder<NodeConnector> anotherChild = child.child(NodeConnector.class, connectorKey);
359         InstanceIdentifier<NodeConnector> path = anotherChild.build();
360         NodeConnectorRef nodeConnectorRef = new NodeConnectorRef(path);
361         return nodeConnectorRef;
362     }
363
364     private static Node buildDpnNode(BigInteger dpnId) {
365         NodeId nodeId = new NodeId("openflow:" + dpnId);
366         Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
367
368         return nodeDpn;
369     }
370
371     private String getGroupKey(long groupId, BigInteger dpId) {
372         String synchronizingKey = "group-key-" + groupId + dpId;
373         return synchronizingKey.intern();
374     }
375
376     private String getFlowKey(BigInteger dpId, short tableId, FlowKey flowKey) {
377         String synchronizingKey = "flow-key-" + dpId + tableId + flowKey;
378         return synchronizingKey.intern();
379     }
380
381     private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
382         if (LOG.isTraceEnabled()) {
383             LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
384         }
385         Flow flow = flowEntity.getFlowBuilder().build();
386         String flowId = flowEntity.getFlowId();
387         short tableId = flowEntity.getTableId();
388         BigInteger dpId = flowEntity.getDpnId();
389         FlowKey flowKey = new FlowKey(new FlowId(flowId));
390         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
391
392         if (isRemove) {
393             synchronized (getFlowKey(dpId, tableId, flowKey)) {
394                 if (flowExists(dpId, tableId, flowKey)) {
395                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
396                 } else {
397                     LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
398                 }
399             }
400         } else {
401             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
402         }
403     }
404
405     private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
406         if (LOG.isTraceEnabled()) {
407             LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
408         }
409         Group group = groupEntity.getGroupBuilder().build();
410         BigInteger dpId = groupEntity.getDpnId();
411         long groupId = groupEntity.getGroupId();
412         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
413
414         if (isRemove) {
415             synchronized (getGroupKey(groupId, dpId)) {
416                 if (groupExists(dpId, groupId)) {
417                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
418                 } else {
419                     LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
420                 }
421             }
422         } else {
423             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
424         }
425     }
426
427     private void syncSetUpGroupInternal(BigInteger dpId, Group group, boolean isRemove) {
428         LOG.trace("syncSetUpGroup for group {} ", group);
429         long groupId = group.getGroupId().getValue();
430         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
431
432         if (isRemove) {
433             synchronized (getGroupKey(groupId, dpId)) {
434                 if (groupExists(dpId, groupId)) {
435                     MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
436                 } else {
437                     LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
438                 }
439             }
440         } else {
441             MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
442         }
443     }
444
445     private class GroupListener extends AsyncClusteredDataTreeChangeListenerBase<Group, GroupListener> {
446
447         GroupListener() {
448             super(Group.class, GroupListener.class);
449         }
450
451         @Override
452         protected void remove(InstanceIdentifier<Group> identifier, Group del) {
453             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
454             executeNotifyTaskIfRequired(dpId, del);
455         }
456
457         private void executeNotifyTaskIfRequired(BigInteger dpId, Group group) {
458             GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue());
459             Runnable notifyTask = groupMap.remove(groupKey);
460             if (notifyTask == null) {
461                 return;
462             }
463             executorService.execute(notifyTask);
464         }
465
466         @Override
467         protected void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
468             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
469             executeNotifyTaskIfRequired(dpId, update);
470         }
471
472         @Override
473         protected void add(InstanceIdentifier<Group> identifier, Group add) {
474             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
475             executeNotifyTaskIfRequired(dpId, add);
476         }
477
478         @Override
479         protected InstanceIdentifier<Group> getWildCardPath() {
480             return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
481                     .child(Group.class);
482         }
483
484         @Override
485         protected GroupListener getDataTreeChangeListener() {
486             return GroupListener.this;
487         }
488     }
489
490     private class FlowListener extends AsyncClusteredDataTreeChangeListenerBase<Flow, FlowListener> {
491
492         FlowListener() {
493             super(Flow.class, FlowListener.class);
494         }
495
496         @Override
497         protected void remove(InstanceIdentifier<Flow> identifier, Flow del) {
498             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
499             notifyTaskIfRequired(dpId, del);
500         }
501
502         private void notifyTaskIfRequired(BigInteger dpId, Flow flow) {
503             FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId(), flow.getMatch(), flow.getId().getValue());
504             Runnable notifyTask = flowMap.remove(flowKey);
505             if (notifyTask == null) {
506                 return;
507             }
508             executorService.execute(notifyTask);
509         }
510
511         @Override
512         protected void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
513         }
514
515         @Override
516         protected void add(InstanceIdentifier<Flow> identifier, Flow add) {
517             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
518             notifyTaskIfRequired(dpId, add);
519         }
520
521         @Override
522         protected InstanceIdentifier<Flow> getWildCardPath() {
523             return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
524                     .child(Table.class).child(Flow.class);
525         }
526
527         @Override
528         protected FlowListener getDataTreeChangeListener() {
529             return FlowListener.this;
530         }
531     }
532
533     private class FlowConfigListener extends AsyncClusteredDataTreeChangeListenerBase<Flow, FlowConfigListener> {
534         private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
535
536         FlowConfigListener() {
537             super(Flow.class, FlowConfigListener.class);
538         }
539
540         @Override
541         protected void remove(InstanceIdentifier<Flow> identifier, Flow del) {
542             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
543             flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
544                 del.getId().getValue(), del.getTableId(), dpId);
545         }
546
547         @Override
548         protected void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
549         }
550
551         @Override
552         protected void add(InstanceIdentifier<Flow> identifier, Flow add) {
553             BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
554             flowLog.debug("FlowId {} added to Table {} on DPN {}",
555                 add.getId().getValue(), add.getTableId(), dpId);
556         }
557
558         @Override
559         protected InstanceIdentifier<Flow> getWildCardPath() {
560             return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
561                 .child(Table.class).child(Flow.class);
562         }
563
564         @Override
565         protected FlowConfigListener getDataTreeChangeListener() {
566             return FlowConfigListener.this;
567         }
568     }
569
570     private static BigInteger getDpnFromString(String dpnString) {
571         String[] split = dpnString.split(":");
572         return new BigInteger(split[1]);
573     }
574
575     @Override
576     public CheckedFuture<Void, TransactionCommitFailedException> installFlow(FlowEntity flowEntity) {
577         return toChecked(installFlowInternal(flowEntity),
578             t -> new TransactionCommitFailedException("installFlow failed", t));
579     }
580
581     @Override
582     public CheckedFuture<Void, TransactionCommitFailedException> installFlow(BigInteger dpId, Flow flowEntity) {
583         return toChecked(installFlowInternal(dpId, flowEntity),
584             t -> new TransactionCommitFailedException("installFlow failed", t));
585     }
586
587     @Override
588     public CheckedFuture<Void, TransactionCommitFailedException> installFlow(BigInteger dpId, FlowEntity flowEntity) {
589         return toChecked(installFlowInternal(dpId, flowEntity.getFlowBuilder().build()),
590             t -> new TransactionCommitFailedException("installFlow failed", t));
591     }
592
593     @Override
594     public ListenableFuture<Void> removeFlow(BigInteger dpId, short tableId, FlowId flowId) {
595         ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
596             tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
597
598         Futures.addCallback(future, new FutureCallback<Void>() {
599             @Override
600             public void onSuccess(final Void result) {
601                 // Committed successfully
602                 LOG.debug("Delete Flow -- Committed successfully");
603             }
604
605             @Override
606             public void onFailure(final Throwable throwable) {
607                 // Transaction failed
608                 if (throwable instanceof OptimisticLockFailedException) {
609                     // Failed because of concurrent transaction modifying same
610                     // data
611                     LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
612                 } else {
613                     // Some other type of TransactionCommitFailedException
614                     LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
615                 }
616             }
617
618         }, MoreExecutors.directExecutor());
619
620         return future;
621     }
622
623     @Override
624     public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(BigInteger dpId, Flow flowEntity) {
625         return toChecked(removeFlowNewInternal(dpId, flowEntity),
626             t -> new TransactionCommitFailedException("removeFlow failed", t));
627     }
628
629     @Override
630     public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(BigInteger dpId, FlowEntity flowEntity) {
631         return toChecked(removeFlowNewInternal(dpId, flowEntity.getFlowBuilder().build()),
632             t -> new TransactionCommitFailedException("removeFlow failed", t));
633     }
634
635     @Override
636     public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(FlowEntity flowEntity) {
637         return toChecked(removeFlowInternal(flowEntity),
638             t -> new TransactionCommitFailedException("removeFlow failed", t));
639     }
640
641     @Override
642     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
643             throws ExecutionException, InterruptedException {
644         removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
645     }
646
647     @Override
648     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, Flow flow)
649             throws ExecutionException, InterruptedException {
650         removeFlow(tx, dpId, flow.key(), flow.getTableId());
651     }
652
653     @Override
654     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, String flowId, short tableId)
655             throws ExecutionException, InterruptedException {
656         removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
657     }
658
659     @Override
660     public void removeFlow(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, FlowKey flowKey,
661             short tableId) throws ExecutionException, InterruptedException {
662         InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
663         if (tx.read(flowInstanceIdentifier).get().isPresent()) {
664             tx.delete(flowInstanceIdentifier);
665         }
666     }
667
668     @Override
669     public void installGroup(GroupEntity groupEntity) {
670         installGroupInternal(groupEntity);
671     }
672
673     @Override
674     public void modifyGroup(GroupEntity groupEntity) {
675         modifyGroupInternal(groupEntity);
676     }
677
678     @Override
679     public void removeGroup(GroupEntity groupEntity) {
680         removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
681     }
682
683     @Override
684     public void removeGroup(BigInteger dpnId, long groupId) {
685         removeGroupInternal(dpnId, groupId);
686     }
687
688     @Override
689     public void removeGroup(BigInteger dpnId, long groupId, WriteTransaction tx) {
690         removeGroupInternal(dpnId, groupId, tx);
691     }
692
693     @Override
694     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
695             throws ExecutionException, InterruptedException {
696         removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
697     }
698
699     @Override
700     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, Group group)
701             throws ExecutionException, InterruptedException {
702         removeGroup(tx, dpId, group.getGroupId().getValue());
703     }
704
705     @Override
706     public void removeGroup(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, long groupId)
707             throws ExecutionException, InterruptedException {
708         Node nodeDpn = buildDpnNode(dpId);
709         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
710         if (tx.read(groupInstanceId).get().isPresent()) {
711             tx.delete(groupInstanceId);
712         } else {
713             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
714         }
715     }
716
717     @Override
718     public void sendPacketOut(BigInteger dpnId, int groupId, byte[] payload) {
719         sendPacketOutInternal(dpnId, groupId, payload);
720     }
721
722     @Override
723     public void sendPacketOutWithActions(BigInteger dpnId, long groupId, byte[] payload, List<ActionInfo> actionInfos) {
724         sendPacketOutWithActionsInternal(dpnId, payload, actionInfos);
725     }
726
727     @Override
728     public void sendPacketOutWithActions(BigInteger dpnId, byte[] payload, List<ActionInfo> actionInfos) {
729         sendPacketOutWithActionsInternal(dpnId, payload, actionInfos);
730     }
731
732     @Override
733     public void sendARPPacketOutWithActions(BigInteger dpnId, byte[] payload, List<ActionInfo> actionInfo) {
734         sendARPPacketOutWithActionsInternal(dpnId, payload, actionInfo);
735     }
736
737     @Override
738     public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
739         syncSetUpFlowInternal(flowEntity, true);
740     }
741
742     @Override
743     public void syncRemoveFlow(FlowEntity flowEntity) {
744         syncSetUpFlowInternal(flowEntity, true);
745     }
746
747     @Override
748     public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
749         syncSetUpFlowInternal(flowEntity, false);
750     }
751
752     @Override
753     public void syncInstallFlow(FlowEntity flowEntity) {
754         syncSetUpFlowInternal(flowEntity, false);
755     }
756
757     @Override
758     public void syncInstallGroup(GroupEntity groupEntity, long delayTime) {
759         syncSetUpGroupInternal(groupEntity, false);
760     }
761
762     @Override
763     public void syncInstallGroup(GroupEntity groupEntity) {
764         syncSetUpGroupInternal(groupEntity, false);
765     }
766
767     @Override
768     public void syncInstallGroup(BigInteger dpId, Group group, long delayTime) {
769         syncSetUpGroupInternal(dpId, group, false);
770     }
771
772     @Override
773     public void syncInstallGroup(BigInteger dpId, Group group) {
774         syncSetUpGroupInternal(dpId, group, false);
775     }
776
777     @Override
778     public void syncRemoveGroup(GroupEntity groupEntity) {
779         syncSetUpGroupInternal(groupEntity, true);
780     }
781
782     @Override
783     public void syncRemoveGroup(BigInteger dpId, Group group) {
784         syncSetUpGroupInternal(dpId, group, true);
785     }
786
787     @Override
788     public void addFlowToTx(FlowEntity flowEntity, WriteTransaction tx) {
789         writeFlowEntityInternal(flowEntity, tx);
790     }
791
792     @Override
793     public void addFlowToTx(BigInteger dpId, Flow flow, WriteTransaction tx) {
794         writeFlowInternal(dpId, flow, tx);
795     }
796
797     @Override
798     public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
799         addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
800     }
801
802     @Override
803     public void addFlow(TypedWriteTransaction<Configuration> tx, BigInteger dpId, Flow flow) {
804         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flow.key());
805         tx.put(flowInstanceId, flow, CREATE_MISSING_PARENTS);
806     }
807
808     @Override
809     public void removeFlowToTx(BigInteger dpId, Flow flow, WriteTransaction tx) {
810         deleteFlowInternal(dpId, flow, tx);
811     }
812
813     @Override
814     public void removeFlowToTx(FlowEntity flowEntity, WriteTransaction tx) {
815         deleteFlowEntityInternal(flowEntity, tx);
816     }
817
818     @Override
819     public void addGroupToTx(GroupEntity groupEntity, WriteTransaction tx) {
820         writeGroupEntityInternal(groupEntity, tx);
821     }
822
823     @Override
824     public void addGroupToTx(BigInteger dpId, Group group, WriteTransaction tx) {
825         writeGroupInternal(dpId, group, tx);
826     }
827
828     @Override
829     public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
830         addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
831     }
832
833     @Override
834     public void addGroup(TypedWriteTransaction<Configuration> tx, BigInteger dpId, Group group) {
835         Node nodeDpn = buildDpnNode(dpId);
836         long groupId = group.getGroupId().getValue();
837         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
838         tx.put(groupInstanceId, group, CREATE_MISSING_PARENTS);
839     }
840
841     @Override
842     public void removeGroupToTx(GroupEntity groupEntity, WriteTransaction tx) {
843         removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId(), tx);
844     }
845
846     @Override
847     public void removeGroupToTx(BigInteger dpId, Group group, WriteTransaction tx) {
848         removeGroupInternal(dpId, group.getGroupId().getValue(), tx);
849     }
850
851     @Override
852     public void batchedAddFlow(BigInteger dpId, FlowEntity flowEntity) {
853         batchedAddFlowInternal(dpId, flowEntity.getFlowBuilder().build());
854     }
855
856     @Override
857     public void batchedAddFlow(BigInteger dpId, Flow flow) {
858         batchedAddFlowInternal(dpId, flow);
859     }
860
861     @Override
862     public void batchedRemoveFlow(BigInteger dpId, FlowEntity flowEntity) {
863         batchedRemoveFlowInternal(dpId, flowEntity.getFlowBuilder().build());
864     }
865
866     @Override
867     public void batchedRemoveFlow(BigInteger dpId, Flow flow) {
868         batchedRemoveFlowInternal(dpId, flow);
869     }
870
871     @Override
872     public void addBucketToTx(BigInteger dpId, long groupId, Bucket bucket, WriteTransaction tx) {
873         addBucket(dpId, groupId, bucket, tx);
874     }
875
876     @Override
877     public void addBucket(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, long groupId, Bucket bucket)
878             throws ExecutionException, InterruptedException {
879         Node nodeDpn = buildDpnNode(dpId);
880         if (groupExists(tx, nodeDpn, groupId)) {
881             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
882                 bucket.getBucketId().getValue(), nodeDpn);
883             tx.put(bucketInstanceId, bucket);
884         }
885     }
886
887     private void addBucket(BigInteger dpId, long groupId, Bucket bucket, WriteTransaction tx) {
888         Node nodeDpn = buildDpnNode(dpId);
889         if (groupExists(nodeDpn, groupId)) {
890             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
891                 bucket.getBucketId().getValue(), nodeDpn);
892             tx.put(LogicalDatastoreType.CONFIGURATION, bucketInstanceId, bucket);
893         }
894     }
895
896     @Override
897     public void removeBucketToTx(BigInteger dpId, long groupId, long bucketId, WriteTransaction tx) {
898         deleteBucket(dpId, groupId, bucketId, tx);
899     }
900
901     @Override
902     public void removeBucket(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, long groupId, long bucketId)
903             throws ExecutionException, InterruptedException {
904         Node nodeDpn = buildDpnNode(dpId);
905         if (groupExists(tx, nodeDpn, groupId)) {
906             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
907             tx.delete(bucketInstanceId);
908         } else {
909             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
910         }
911     }
912
913     private void deleteBucket(BigInteger dpId, long groupId, long bucketId, WriteTransaction tx) {
914         Node nodeDpn = buildDpnNode(dpId);
915         if (groupExists(nodeDpn, groupId)) {
916             InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
917             tx.delete(LogicalDatastoreType.CONFIGURATION, bucketInstanceId);
918         } else {
919             LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
920         }
921     }
922
923     @Override
924     public boolean groupExists(BigInteger dpId, long groupId) {
925         return groupExists(buildDpnNode(dpId), groupId);
926     }
927
928     private boolean groupExists(Node nodeDpn, long groupId) {
929         InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
930         try {
931             return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
932         } catch (ReadFailedException e) {
933             LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
934         }
935         return false;
936     }
937
938     private boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
939            throws ExecutionException, InterruptedException {
940         return tx.read(buildGroupInstanceIdentifier(groupId, nodeDpn)).get().isPresent();
941     }
942
943     private InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
944         InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
945                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
946                 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
947         return groupInstanceId;
948     }
949
950     private boolean flowExists(BigInteger dpId, short tableId, FlowKey flowKey) {
951         InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
952         try {
953             Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
954                     flowInstanceId);
955             return flowOptional.isPresent();
956         } catch (ReadFailedException e) {
957             LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
958         }
959         return false;
960     }
961
962     private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(BigInteger dpnId, short tableId,
963             FlowKey flowKey) {
964         InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
965                 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
966                 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
967         return flowInstanceId;
968     }
969
970     private InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
971             Node nodeDpn) {
972         InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
973                 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
974                 .child(Group.class, new GroupKey(new GroupId(groupId)))
975                 .child(Buckets.class)
976                 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
977         return bucketInstanceId;
978     }
979
980     private FluentFuture<Void> addCallBackForDeleteFlowAndReturnm(FluentFuture<Void> fluentFuture) {
981         return callBack(fluentFuture, "Delete Flow");
982     }
983
984     private FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
985         return callBack(fluentFuture, "Install Flow");
986     }
987
988     private FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
989         return callBack(fluentFuture, "Install Group");
990     }
991
992     // Generic for handling callbacks
993     private FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
994         fluentFuture.addCallback(new FutureCallback<Void>() {
995             @Override
996             public void onSuccess(final Void result) {
997                 // Committed successfully
998                 LOG.debug("{} -- Committedsuccessfully ", log);
999             }
1000
1001             @Override
1002             public void onFailure(final Throwable throwable) {
1003                 // Transaction failed
1004
1005                 if (throwable instanceof OptimisticLockFailedException) {
1006                     // Failed because of concurrent transaction modifying same
1007                     // data
1008                     LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
1009                 } else {
1010                     // Some other type of TransactionCommitFailedException
1011                     LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
1012                 }
1013             }
1014         }, MoreExecutors.directExecutor());
1015         return fluentFuture;
1016     }
1017 }