2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.mdsalutil.internal;
11 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.ExecutorService;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
28 import org.opendaylight.genius.infra.Datastore;
29 import org.opendaylight.genius.infra.Datastore.Configuration;
30 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
31 import org.opendaylight.genius.infra.TypedReadTransaction;
32 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
33 import org.opendaylight.genius.infra.TypedWriteTransaction;
34 import org.opendaylight.genius.mdsalutil.FlowEntity;
35 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
36 import org.opendaylight.genius.mdsalutil.GroupEntity;
37 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
38 import org.opendaylight.genius.mdsalutil.MDSALUtil;
39 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
40 import org.opendaylight.infrautils.inject.AbstractLifecycle;
41 import org.opendaylight.infrautils.utils.concurrent.Executors;
42 import org.opendaylight.mdsal.binding.api.DataBroker;
43 import org.opendaylight.mdsal.binding.api.WriteTransaction;
44 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
45 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
46 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.Uint64;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
73 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
74 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
76 private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
78 private final DataBroker dataBroker;
79 private final RetryingManagedNewTransactionRunner txRunner;
80 private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
82 private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
83 private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
84 private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
85 private final SingleTransactionDataBroker singleTxDb;
86 private final FlowListener flowListener = new FlowListener();
87 private final FlowConfigListener flowConfigListener = new FlowConfigListener();
88 private final GroupListener groupListener = new GroupListener();
91 * Writes the flows and Groups to the MD SAL DataStore which will be sent to
92 * the openflowplugin for installing flows/groups on the switch. Other
93 * modules of VPN service that wants to install flows / groups on the switch
97 * dataBroker reference
98 * @param pktProcService
99 * PacketProcessingService for sending the packet outs
102 public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
107 public MDSALManager(DataBroker db) {
108 this.dataBroker = db;
109 this.txRunner = new RetryingManagedNewTransactionRunner(db);
110 singleTxDb = new SingleTransactionDataBroker(dataBroker);
111 LOG.info("MDSAL Manager Initialized ");
115 protected void start() {
116 LOG.info("{} start", getClass().getSimpleName());
118 int batchSize = Integer.getInteger("batch.size", 1000);
119 int batchInterval = Integer.getInteger("batch.wait.time", 500);
121 flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
125 protected void stop() {
126 LOG.info("{} stop", getClass().getSimpleName());
128 flowListener.close();
129 flowConfigListener.close();
130 groupListener.close();
134 FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
135 return addCallBackForInstallFlowAndReturn(txRunner
136 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
137 tx -> writeFlowEntityInternal(flowEntity, tx)));
140 private FluentFuture<Void> installFlowInternal(Uint64 dpId, Flow flow) {
141 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
142 tx -> writeFlowInternal(dpId, flow, tx));
145 private static void writeFlowEntityInternal(FlowEntity flowEntity,
146 TypedWriteTransaction<Datastore.Configuration> tx) {
147 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
148 FlowBuilder flowbld = flowEntity.getFlowBuilder();
149 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
150 flowEntity.getTableId(), flowKey);
151 tx.mergeParentStructurePut(flowInstanceId, flowbld.build());
154 private static void writeFlowInternal(Uint64 dpId, Flow flow,
155 TypedWriteTransaction<Datastore.Configuration> tx) {
156 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
157 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
158 flow.getTableId().toJava(), flowKey);
159 tx.mergeParentStructurePut(flowInstanceId, flow);
163 FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
164 return addCallBackForInstallGroupAndReturn(txRunner
165 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
166 tx -> writeGroupEntityInternal(groupEntity, tx)));
169 private static void writeGroupEntityInternal(GroupEntity groupEntity,
170 TypedWriteTransaction<Datastore.Configuration> tx) {
171 Group group = groupEntity.getGroupBuilder().build();
172 Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
173 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
174 tx.mergeParentStructurePut(groupInstanceId, group);
178 FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
179 return addCallBackForDeleteFlowAndReturn(txRunner
180 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
181 tx -> deleteFlowEntityInternal(flowEntity, tx)));
184 private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
185 Uint64 dpId = flowEntity.getDpnId();
186 short tableId = flowEntity.getTableId();
187 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
188 deleteFlow(dpId, tableId, flowKey, tx);
191 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
192 if (flowExists(dpId, tableId, flowKey)) {
193 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
194 tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
196 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
200 private void deleteFlow(Uint64 dpId, short tableId, FlowKey flowKey,
201 TypedWriteTransaction<Datastore.Configuration> tx) {
202 if (flowExists(dpId, tableId, flowKey)) {
203 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
204 tx.delete(flowInstanceId);
206 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
210 private FluentFuture<Void> removeFlowNewInternal(Uint64 dpnId, Flow flowEntity) {
211 LOG.debug("Remove flow {}", flowEntity);
212 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
214 FlowKey flowKey = new FlowKey(flowEntity.getId());
215 short tableId = flowEntity.getTableId().toJava();
216 deleteFlow(dpnId, tableId, flowKey, tx);
221 FluentFuture<Void> removeGroupInternal(Uint64 dpnId, long groupId) {
222 return addCallBackForInstallGroupAndReturn(txRunner
223 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
224 tx -> removeGroupInternal(dpnId, groupId, tx)));
227 private void removeGroupInternal(Uint64 dpnId, long groupId,
228 TypedWriteTransaction<Datastore.Configuration> tx) {
229 Node nodeDpn = buildDpnNode(dpnId);
230 if (groupExists(nodeDpn, groupId)) {
231 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
232 tx.delete(groupInstanceId);
234 LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
238 private static Node buildDpnNode(Uint64 dpnId) {
239 NodeId nodeId = new NodeId("openflow:" + dpnId);
240 Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
245 private static String getGroupKey(long groupId, Uint64 dpId) {
246 String synchronizingKey = "group-key-" + groupId + dpId;
247 return synchronizingKey.intern();
250 private static String getFlowKey(Uint64 dpId, short tableId, FlowKey flowKey) {
251 String synchronizingKey = "flow-key-" + dpId + tableId + flowKey;
252 return synchronizingKey.intern();
255 private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
256 if (LOG.isTraceEnabled()) {
257 LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
259 Flow flow = flowEntity.getFlowBuilder().build();
260 String flowId = flowEntity.getFlowId();
261 short tableId = flowEntity.getTableId();
262 Uint64 dpId = flowEntity.getDpnId();
263 FlowKey flowKey = new FlowKey(new FlowId(flowId));
264 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
267 synchronized (getFlowKey(dpId, tableId, flowKey)) {
268 if (flowExists(dpId, tableId, flowKey)) {
269 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
271 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
275 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
279 private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
280 if (LOG.isTraceEnabled()) {
281 LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
283 Group group = groupEntity.getGroupBuilder().build();
284 Uint64 dpId = groupEntity.getDpnId();
285 long groupId = groupEntity.getGroupId();
286 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
289 synchronized (getGroupKey(groupId, dpId)) {
290 if (groupExists(dpId, groupId)) {
291 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
293 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
297 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
301 private class GroupListener extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
304 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
305 .augmentation(FlowCapableNode.class).child(Group.class),
306 Executors.newSingleThreadExecutor("GroupListener", LOG));
310 public void remove(InstanceIdentifier<Group> identifier, Group del) {
311 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
312 executeNotifyTaskIfRequired(dpId, del);
315 private void executeNotifyTaskIfRequired(Uint64 dpId, Group group) {
316 GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue().toJava());
317 Runnable notifyTask = groupMap.remove(groupKey);
318 if (notifyTask == null) {
321 executorService.execute(notifyTask);
325 public void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
326 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
327 executeNotifyTaskIfRequired(dpId, update);
331 public void add(InstanceIdentifier<Group> identifier, Group add) {
332 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
333 executeNotifyTaskIfRequired(dpId, add);
337 private class FlowListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
340 super(dataBroker, LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class).child(Node.class)
341 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
342 Executors.newSingleThreadExecutor("FlowListener", LOG));
346 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
347 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
348 notifyTaskIfRequired(dpId, del);
351 private void notifyTaskIfRequired(Uint64 dpId, Flow flow) {
352 FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId().toJava(),
353 flow.getMatch(), flow.getId().getValue());
354 Runnable notifyTask = flowMap.remove(flowKey);
355 if (notifyTask == null) {
358 executorService.execute(notifyTask);
362 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
366 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
367 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
368 notifyTaskIfRequired(dpId, add);
373 private class FlowConfigListener extends AbstractClusteredAsyncDataTreeChangeListener<Flow> {
374 private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
376 FlowConfigListener() {
377 super(dataBroker, LogicalDatastoreType.OPERATIONAL,InstanceIdentifier.create(Nodes.class).child(Node.class)
378 .augmentation(FlowCapableNode.class).child(Table.class).child(Flow.class),
379 Executors.newSingleThreadExecutor("FlowConfigListener", LOG));
383 public void remove(InstanceIdentifier<Flow> identifier, Flow del) {
384 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
385 flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
386 del.getId().getValue(), del.getTableId(), dpId);
390 public void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
394 public void add(InstanceIdentifier<Flow> identifier, Flow add) {
395 Uint64 dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
396 flowLog.debug("FlowId {} added to Table {} on DPN {}",
397 add.getId().getValue(), add.getTableId(), dpId);
401 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
402 justification = "https://github.com/spotbugs/spotbugs/issues/811")
403 private static Uint64 getDpnFromString(String dpnString) {
404 String[] split = dpnString.split(":");
405 return Uint64.valueOf(split[1]);
409 public FluentFuture<Void> installFlow(FlowEntity flowEntity) {
410 return installFlowInternal(flowEntity);
414 public FluentFuture<Void> installFlow(Uint64 dpId, Flow flowEntity) {
415 return installFlowInternal(dpId, flowEntity);
419 public FluentFuture<Void> installFlow(Uint64 dpId, FlowEntity flowEntity) {
420 return installFlowInternal(dpId, flowEntity.getFlowBuilder().build());
424 public ListenableFuture<Void> removeFlow(Uint64 dpId, short tableId, FlowId flowId) {
425 ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
426 tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
428 Futures.addCallback(future, new FutureCallback<Void>() {
430 public void onSuccess(final Void result) {
431 // Committed successfully
432 LOG.debug("Delete Flow -- Committed successfully");
436 public void onFailure(final Throwable throwable) {
437 // Transaction failed
438 if (throwable instanceof OptimisticLockFailedException) {
439 // Failed because of concurrent transaction modifying same
441 LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
443 // Some other type of TransactionCommitFailedException
444 LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
448 }, MoreExecutors.directExecutor());
454 public FluentFuture<Void> removeFlow(Uint64 dpId, Flow flowEntity) {
455 return removeFlowNewInternal(dpId, flowEntity);
459 public FluentFuture<Void> removeFlow(FlowEntity flowEntity) {
460 return removeFlowInternal(flowEntity);
464 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
465 throws ExecutionException, InterruptedException {
466 removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
470 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow)
471 throws ExecutionException, InterruptedException {
472 removeFlow(tx, dpId, flow.key(), flow.getTableId().toJava());
476 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, String flowId, short tableId)
477 throws ExecutionException, InterruptedException {
478 removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
482 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, FlowKey flowKey,
483 short tableId) throws ExecutionException, InterruptedException {
484 InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
485 if (tx.read(flowInstanceIdentifier).get().isPresent()) {
486 tx.delete(flowInstanceIdentifier);
491 public void removeGroup(GroupEntity groupEntity) {
492 removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
496 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
497 throws ExecutionException, InterruptedException {
498 removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
502 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, Group group)
503 throws ExecutionException, InterruptedException {
504 removeGroup(tx, dpId, group.getGroupId().getValue().toJava());
508 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId)
509 throws ExecutionException, InterruptedException {
510 Node nodeDpn = buildDpnNode(dpId);
511 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
512 if (tx.read(groupInstanceId).get().isPresent()) {
513 tx.delete(groupInstanceId);
515 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
520 public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
521 syncSetUpFlowInternal(flowEntity, true);
525 public void syncRemoveFlow(FlowEntity flowEntity) {
526 syncSetUpFlowInternal(flowEntity, true);
530 public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
531 syncSetUpFlowInternal(flowEntity, false);
535 public void syncInstallFlow(FlowEntity flowEntity) {
536 syncSetUpFlowInternal(flowEntity, false);
540 public void syncInstallGroup(GroupEntity groupEntity) {
541 syncSetUpGroupInternal(groupEntity, false);
545 public void syncRemoveGroup(GroupEntity groupEntity) {
546 syncSetUpGroupInternal(groupEntity, true);
550 public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
551 addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
555 public void addFlow(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Flow flow) {
556 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId,
557 flow.getTableId().toJava(), flow.key());
558 tx.mergeParentStructurePut(flowInstanceId, flow);
562 public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
563 addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
567 public void addGroup(TypedWriteTransaction<Configuration> tx, Uint64 dpId, Group group) {
568 Node nodeDpn = buildDpnNode(dpId);
569 long groupId = group.getGroupId().getValue().toJava();
570 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
571 tx.mergeParentStructurePut(groupInstanceId, group);
575 public void addBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, Bucket bucket)
576 throws ExecutionException, InterruptedException {
577 Node nodeDpn = buildDpnNode(dpId);
578 if (groupExists(tx, nodeDpn, groupId)) {
579 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
580 bucket.getBucketId().getValue().toJava(), nodeDpn);
581 tx.put(bucketInstanceId, bucket);
586 public void removeBucket(TypedReadWriteTransaction<Configuration> tx, Uint64 dpId, long groupId, long bucketId)
587 throws ExecutionException, InterruptedException {
588 Node nodeDpn = buildDpnNode(dpId);
589 if (groupExists(tx, nodeDpn, groupId)) {
590 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
591 tx.delete(bucketInstanceId);
593 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
598 public boolean groupExists(Uint64 dpId, long groupId) {
599 return groupExists(buildDpnNode(dpId), groupId);
602 private boolean groupExists(Node nodeDpn, long groupId) {
603 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
605 return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
606 } catch (ExecutionException | InterruptedException e) {
607 LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
612 private static boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
613 throws ExecutionException, InterruptedException {
614 return tx.exists(buildGroupInstanceIdentifier(groupId, nodeDpn)).get();
617 private static InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
618 InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
619 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
620 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
621 return groupInstanceId;
624 private boolean flowExists(Uint64 dpId, short tableId, FlowKey flowKey) {
625 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
627 Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
629 return flowOptional.isPresent();
630 } catch (ExecutionException | InterruptedException e) {
631 LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
636 private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(Uint64 dpnId, short tableId,
638 InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
639 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
640 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
641 return flowInstanceId;
644 private static InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
646 InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
647 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
648 .child(Group.class, new GroupKey(new GroupId(groupId)))
649 .child(Buckets.class)
650 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
651 return bucketInstanceId;
654 private static FluentFuture<Void> addCallBackForDeleteFlowAndReturn(FluentFuture<Void> fluentFuture) {
655 return callBack(fluentFuture, "Delete Flow");
658 private static FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
659 return callBack(fluentFuture, "Install Flow");
662 private static FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
663 return callBack(fluentFuture, "Install Group");
666 // Generic for handling callbacks
667 private static FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
668 fluentFuture.addCallback(new FutureCallback<Void>() {
670 public void onSuccess(final Void result) {
671 // Committed successfully
672 LOG.debug("{} -- Committedsuccessfully ", log);
676 public void onFailure(final Throwable throwable) {
677 // Transaction failed
679 if (throwable instanceof OptimisticLockFailedException) {
680 // Failed because of concurrent transaction modifying same
682 LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
684 // Some other type of TransactionCommitFailedException
685 LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
688 }, MoreExecutors.directExecutor());