2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.mdsalutil.internal;
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
12 import static org.opendaylight.mdsal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FluentFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
29 import org.opendaylight.genius.infra.Datastore;
30 import org.opendaylight.genius.infra.Datastore.Configuration;
31 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
32 import org.opendaylight.genius.infra.TypedReadTransaction;
33 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
34 import org.opendaylight.genius.infra.TypedWriteTransaction;
35 import org.opendaylight.genius.mdsalutil.FlowEntity;
36 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
37 import org.opendaylight.genius.mdsalutil.GroupEntity;
38 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
39 import org.opendaylight.genius.mdsalutil.MDSALUtil;
40 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
41 import org.opendaylight.infrautils.inject.AbstractLifecycle;
42 import org.opendaylight.infrautils.utils.concurrent.Executors;
43 import org.opendaylight.mdsal.binding.api.DataBroker;
44 import org.opendaylight.mdsal.binding.api.WriteTransaction;
45 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
46 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
47 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.opendaylight.yangtools.yang.common.Uint64;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
74 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
75 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
77 private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
79 private final DataBroker dataBroker;
80 private final RetryingManagedNewTransactionRunner txRunner;
81 private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
83 private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
84 private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
85 private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
86 private final SingleTransactionDataBroker singleTxDb;
87 private final FlowListener flowListener = new FlowListener();
88 private final FlowConfigListener flowConfigListener = new FlowConfigListener();
89 private final GroupListener groupListener = new GroupListener();
92 * Writes the flows and Groups to the MD SAL DataStore which will be sent to
93 * the openflowplugin for installing flows/groups on the switch. Other
94 * modules of VPN service that wants to install flows / groups on the switch
98 * dataBroker reference
99 * @param pktProcService
100 * PacketProcessingService for sending the packet outs
103 public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
108 public MDSALManager(DataBroker db) {
109 this.dataBroker = db;
110 this.txRunner = new RetryingManagedNewTransactionRunner(db);
111 singleTxDb = new SingleTransactionDataBroker(dataBroker);
112 LOG.info("MDSAL Manager Initialized ");
116 protected void start() {
117 LOG.info("{} start", getClass().getSimpleName());
119 int batchSize = Integer.getInteger("batch.size", 1000);
120 int batchInterval = Integer.getInteger("batch.wait.time", 500);
122 flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
126 protected void stop() {
127 LOG.info("{} stop", getClass().getSimpleName());
129 flowListener.close();
130 flowConfigListener.close();
131 groupListener.close();
135 FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
136 return addCallBackForInstallFlowAndReturn(txRunner
137 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
138 tx -> writeFlowEntityInternal(flowEntity, tx)));
141 private FluentFuture<Void> installFlowInternal(Uint64 dpId, Flow flow) {
142 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
143 tx -> writeFlowInternal(dpId, flow, tx));
146 private static void writeFlowEntityInternal(FlowEntity flowEntity,
147 TypedWriteTransaction<Datastore.Configuration> tx) {
148 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
149 FlowBuilder flowbld = flowEntity.getFlowBuilder();
150 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
151 flowEntity.getTableId(), flowKey);
152 tx.put(flowInstanceId, flowbld.build(), true);
155 private static void writeFlowInternal(Uint64 dpId, Flow flow,
156 TypedWriteTransaction<Datastore.Configuration> tx) {
157 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
158 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
159 flow.getTableId().toJava(), flowKey);
160 tx.put(flowInstanceId, flow, true);
164 FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
165 return addCallBackForInstallGroupAndReturn(txRunner
166 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
167 tx -> writeGroupEntityInternal(groupEntity, tx)));
170 private static void writeGroupEntityInternal(GroupEntity groupEntity,
171 TypedWriteTransaction<Datastore.Configuration> tx) {
172 Group group = groupEntity.getGroupBuilder().build();
173 Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
174 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
175 tx.put(groupInstanceId, group, true);
179 FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
180 return addCallBackForDeleteFlowAndReturn(txRunner
181 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
182 tx -> deleteFlowEntityInternal(flowEntity, tx)));
185 private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
186 Uint64 dpId = flowEntity.getDpnId();
187 short tableId = flowEntity.getTableId();
188 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
189 deleteFlow(dpId, tableId, flowKey, tx);
192 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
193 if (flowExists(dpId, tableId, flowKey)) {
194 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
195 tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
197 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
201 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
202 TypedWriteTransaction<Datastore.Configuration> tx) {
203 if (flowExists(dpId, tableId, flowKey)) {
204 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
205 tx.delete(flowInstanceId);
207 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
211 private FluentFuture<Void> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
212 LOG.debug("Remove flow {}", flowEntity);
213 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
215 FlowKey flowKey = new FlowKey(flowEntity.getId());
216 short tableId = flowEntity.getTableId().toJava();
217 deleteFlow(dpnId, tableId, flowKey, tx);
222 FluentFuture<Void> removeGroupInternal(Uint64 dpnId, long groupId) {
223 return addCallBackForInstallGroupAndReturn(txRunner
224 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
225 tx -> removeGroupInternal(dpnId, groupId, tx)));
228 private void removeGroupInternal(Uint64 dpnId, long groupId,
229 TypedWriteTransaction<Datastore.Configuration> tx) {
230 Node nodeDpn = buildDpnNode(dpnId);
231 if (groupExists(nodeDpn, groupId)) {
232 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
233 tx.delete(groupInstanceId);
235 LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
239 private static Node buildDpnNode(Uint64 dpnId) {
240 NodeId nodeId = new NodeId("openflow:" + dpnId);
241 Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
246 private static String getGroupKey(long groupId, Uint64 dpId) {
247 String synchronizingKey = "group-key-" + groupId + dpId;
248 return synchronizingKey.intern();
251 private static String getFlowKey(Uint64 dpId, short tableId, FlowKey flowKey) {
252 String synchronizingKey = "flow-key-" + dpId + tableId + flowKey;
253 return synchronizingKey.intern();
256 private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
257 if (LOG.isTraceEnabled()) {
258 LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
260 Flow flow = flowEntity.getFlowBuilder().build();
261 String flowId = flowEntity.getFlowId();
262 short tableId = flowEntity.getTableId();
263 Uint64 dpId = flowEntity.getDpnId();
264 FlowKey flowKey = new FlowKey(new FlowId(flowId));
265 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
268 synchronized (getFlowKey(dpId, tableId, flowKey)) {
269 if (flowExists(dpId, tableId, flowKey)) {
270 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
272 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
276 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
280 private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
281 if (LOG.isTraceEnabled()) {
282 LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
284 Group group = groupEntity.getGroupBuilder().build();
285 Uint64 dpId = groupEntity.getDpnId();
286 long groupId = groupEntity.getGroupId();
287 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
290 synchronized (getGroupKey(groupId, dpId)) {
291 if (groupExists(dpId, groupId)) {
292 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
294 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
298 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
302 private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
305 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
306 .augmentation(FlowCapableNode.class).child(Group.class),
307 Executors.newSingleThreadExecutor("GroupListener", LOG));
311 public void remove(InstanceIdentifier<Group> identifier, Group del) {
312 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
313 executeNotifyTaskIfRequired(dpId, del);
316 private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
317 GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
318 Runnable notifyTask = groupMap.remove(groupKey);
319 if (notifyTask == null) {
322 executorService.execute(notifyTask);
326 public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
327 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
328 executeNotifyTaskIfRequired(dpId, update);
332 public void add(InstanceIdentifier<Group> identifier, Group add) {
333 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
334 executeNotifyTaskIfRequired(dpId, add);
338 private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
341 super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
342 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
343 Executors.newSingleThreadExecutor("FlowListener", LOG));
347 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
348 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
349 notifyTaskIfRequired(dpId, del);
352 private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
353 FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
354 flow.getMatch(), flow.getId().getValue());
355 Runnable notifyTask = flowMap.remove(flowKey);
356 if (notifyTask == null) {
359 executorService.execute(notifyTask);
363 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
367 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
368 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
369 notifyTaskIfRequired(dpId, add);
374 private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
375 private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
377 FlowConfigListener() {
378 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
379 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
380 Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
384 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
385 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
386 flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
387 del.getId().getValue(), del.getTableId(), dpId);
391 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
395 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
396 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
397 flowLog.debug("FlowId {} added to Table {} on DPN {}",
398 add.getId().getValue(), add.getTableId(), dpId);
402 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
403 justification = "https://github.com/spotbugs/spotbugs/issues/811")
404 private static Uint64 getDpnFromString(String dpnString) {
405 String[] split = dpnString.split(":");
406 return Uint64.valueOf(split[1]);
410 public FluentFuture<Void> installFlow(FlowEntity flowEntity) {
411 return installFlowInternal(flowEntity);
415 public FluentFuture<Void> installFlow(Uint64 dpId, Flow flowEntity) {
416 return installFlowInternal(dpId, flowEntity);
420 public FluentFuture<Void> installFlow(Uint64 dpId, FlowEntity flowEntity) {
421 return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
425 public ListenableFuture<Void> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
426 ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
427 tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
429 Futures.addCallback(future, new FutureCallback<Void>() {
431 public void onSuccess(final Void result) {
432 // Committed successfully
433 LOG.debug("Delete Flow -- Committed successfully");
437 public void onFailure(final Throwable throwable) {
438 // Transaction failed
439 if (throwable instanceof OptimisticLockFailedException) {
440 // Failed because of concurrent transaction modifying same
442 LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
444 // Some other type of TransactionCommitFailedException
445 LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
449 }, MoreExecutors.directExecutor());
455 public FluentFuture<Void> removeFlow(Uint64 dpId, Flow flowEntity) {
456 return removeFlowNewInternal(dpId, flowEntity);
460 public FluentFuture<Void> removeFlow(FlowEntity flowEntity) {
461 return removeFlowInternal(flowEntity);
465 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
466 throws ExecutionException, InterruptedException {
467 removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
471 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
472 throws ExecutionException, InterruptedException {
473 removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
477 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
478 throws ExecutionException, InterruptedException {
479 removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
483 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
484 short tableId) throws ExecutionException, InterruptedException {
485 InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
486 if (tx.read(flowInstanceIdentifier).get().isPresent()) {
487 tx.delete(flowInstanceIdentifier);
492 public void removeGroup(GroupEntity groupEntity) {
493 removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
497 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
498 throws ExecutionException, InterruptedException {
499 removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
503 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
504 throws ExecutionException, InterruptedException {
505 removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
509 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
510 throws ExecutionException, InterruptedException {
511 Node nodeDpn = buildDpnNode(dpId);
512 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
513 if (tx.read(groupInstanceId).get().isPresent()) {
514 tx.delete(groupInstanceId);
516 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
521 public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
522 syncSetUpFlowInternal(flowEntity, true);
526 public void syncRemoveFlow(FlowEntity flowEntity) {
527 syncSetUpFlowInternal(flowEntity, true);
531 public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
532 syncSetUpFlowInternal(flowEntity, false);
536 public void syncInstallFlow(FlowEntity flowEntity) {
537 syncSetUpFlowInternal(flowEntity, false);
541 public void syncInstallGroup(GroupEntity groupEntity) {
542 syncSetUpGroupInternal(groupEntity, false);
546 public void syncRemoveGroup(GroupEntity groupEntity) {
547 syncSetUpGroupInternal(groupEntity, true);
551 public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
552 addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
556 public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
557 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
558 flow.getTableId().toJava(), flow.key());
559 tx.put(flowInstanceId, flow, CREATE_MISSING_PARENTS);
563 public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
564 addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
568 public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
569 Node nodeDpn = buildDpnNode(dpId);
570 long groupId = group.getGroupId().getValue().toJava();
571 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
572 tx.put(groupInstanceId, group, CREATE_MISSING_PARENTS);
576 public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
577 throws ExecutionException, InterruptedException {
578 Node nodeDpn = buildDpnNode(dpId);
579 if (groupExists(tx, nodeDpn, groupId)) {
580 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
581 bucket.getBucketId().getValue().toJava(), nodeDpn);
582 tx.put(bucketInstanceId, bucket);
587 public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
588 throws ExecutionException, InterruptedException {
589 Node nodeDpn = buildDpnNode(dpId);
590 if (groupExists(tx, nodeDpn, groupId)) {
591 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
592 tx.delete(bucketInstanceId);
594 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
599 public boolean groupExists(Uint64 dpId, long groupId) {
600 return groupExists(buildDpnNode(dpId), groupId);
603 private boolean groupExists(Node nodeDpn, long groupId) {
604 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
606 return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
607 } catch (ExecutionException | InterruptedException e) {
608 LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
613 private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
614 throws ExecutionException, InterruptedException {
615 return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
618 private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
619 InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
620 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
621 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
622 return groupInstanceId;
625 private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
626 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
628 Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
630 return flowOptional.isPresent();
631 } catch (ExecutionException | InterruptedException e) {
632 LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
637 private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
639 InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
640 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
641 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
642 return flowInstanceId;
645 private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
647 InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
648 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
649 .child(Group.class, new GroupKey(new GroupId(groupId)))
650 .child(Buckets.class)
651 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
652 return bucketInstanceId;
655 private static FluentFuture<Void> addCallBackForDeleteFlowAndReturn(FluentFuture<Void> fluentFuture) {
656 return callBack(fluentFuture, "Delete Flow");
659 private static FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
660 return callBack(fluentFuture, "Install Flow");
663 private static FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
664 return callBack(fluentFuture, "Install Group");
667 // Generic for handling callbacks
668 private static FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
669 fluentFuture.addCallback(new FutureCallback<Void>() {
671 public void onSuccess(final Void result) {
672 // Committed successfully
673 LOG.debug("{} -- Committedsuccessfully ", log);
677 public void onFailure(final Throwable throwable) {
678 // Transaction failed
680 if (throwable instanceof OptimisticLockFailedException) {
681 // Failed because of concurrent transaction modifying same
683 LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
685 // Some other type of TransactionCommitFailedException
686 LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
689 }, MoreExecutors.directExecutor());