2 * Copyright (c) 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.genius.mdsalutil.internal;
11 import static org.opendaylight.controller.md.sal.binding.api.WriteTransaction.CREATE_MISSING_PARENTS;
12 import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningSingleThreadExecutor;
13 import static org.opendaylight.infrautils.utils.concurrent.FluentFutures2.toChecked;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FluentFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import java.math.BigInteger;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
36 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
37 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
38 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
39 import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
40 import org.opendaylight.genius.infra.Datastore;
41 import org.opendaylight.genius.infra.Datastore.Configuration;
42 import org.opendaylight.genius.infra.RetryingManagedNewTransactionRunner;
43 import org.opendaylight.genius.infra.TypedReadTransaction;
44 import org.opendaylight.genius.infra.TypedReadWriteTransaction;
45 import org.opendaylight.genius.infra.TypedWriteTransaction;
46 import org.opendaylight.genius.mdsalutil.ActionInfo;
47 import org.opendaylight.genius.mdsalutil.FlowEntity;
48 import org.opendaylight.genius.mdsalutil.FlowInfoKey;
49 import org.opendaylight.genius.mdsalutil.GroupEntity;
50 import org.opendaylight.genius.mdsalutil.GroupInfoKey;
51 import org.opendaylight.genius.mdsalutil.MDSALUtil;
52 import org.opendaylight.genius.mdsalutil.actions.ActionGroup;
53 import org.opendaylight.genius.mdsalutil.interfaces.IMdsalApiManager;
54 import org.opendaylight.infrautils.inject.AbstractLifecycle;
55 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.BucketId;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.BucketKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketOutput;
81 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
82 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.slf4j.Logger;
85 import org.slf4j.LoggerFactory;
88 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
89 public class MDSALManager extends AbstractLifecycle implements IMdsalApiManager {
91 private static final Logger LOG = LoggerFactory.getLogger(MDSALManager.class);
93 private final DataBroker dataBroker;
94 private final RetryingManagedNewTransactionRunner txRunner;
95 private final FlowBatchingUtils flowBatchingUtils = new FlowBatchingUtils();
97 private final PacketProcessingService packetProcessingService;
98 private final ConcurrentMap<FlowInfoKey, Runnable> flowMap = new ConcurrentHashMap<>();
99 private final ConcurrentMap<GroupInfoKey, Runnable> groupMap = new ConcurrentHashMap<>();
100 private final ExecutorService executorService = newListeningSingleThreadExecutor("genius-MDSALManager", LOG);
101 private final SingleTransactionDataBroker singleTxDb;
102 private final FlowListener flowListener = new FlowListener();
103 private final FlowConfigListener flowConfigListener = new FlowConfigListener();
104 private final GroupListener groupListener = new GroupListener();
107 * Writes the flows and Groups to the MD SAL DataStore which will be sent to
108 * the openflowplugin for installing flows/groups on the switch. Other
109 * modules of VPN service that wants to install flows / groups on the switch
113 * dataBroker reference
114 * @param pktProcService
115 * PacketProcessingService for sending the packet outs
118 public MDSALManager(DataBroker db, PacketProcessingService pktProcService) {
119 this.dataBroker = db;
120 this.txRunner = new RetryingManagedNewTransactionRunner(db);
121 this.packetProcessingService = pktProcService;
122 singleTxDb = new SingleTransactionDataBroker(dataBroker);
123 LOG.info("MDSAL Manager Initialized ");
127 protected void start() throws Exception {
128 LOG.info("{} start", getClass().getSimpleName());
130 int batchSize = Integer.getInteger("batch.size", 1000);
131 int batchInterval = Integer.getInteger("batch.wait.time", 500);
133 flowBatchingUtils.registerWithBatchManager(new MdSalUtilBatchHandler(dataBroker, batchSize, batchInterval));
134 flowListener.registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
135 flowConfigListener.registerListener(LogicalDatastoreType.CONFIGURATION, dataBroker);
136 groupListener.registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
140 protected void stop() throws Exception {
141 LOG.info("{} stop", getClass().getSimpleName());
143 flowListener.close();
144 flowConfigListener.close();
145 groupListener.close();
149 FluentFuture<Void> installFlowInternal(FlowEntity flowEntity) {
150 return addCallBackForInstallFlowAndReturn(txRunner
151 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
152 tx -> writeFlowEntityInternal(flowEntity, tx)));
155 private FluentFuture<Void> installFlowInternal(BigInteger dpId, Flow flow) {
156 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
157 tx -> writeFlowInternal(dpId, flow, tx));
160 private void writeFlowEntityInternal(FlowEntity flowEntity, WriteTransaction tx) {
161 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
162 FlowBuilder flowbld = flowEntity.getFlowBuilder();
163 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
164 flowEntity.getTableId(), flowKey);
165 tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flowbld.build(), true);
168 private void writeFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
169 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
170 FlowBuilder flowbld = flowEntity.getFlowBuilder();
171 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(flowEntity.getDpnId(),
172 flowEntity.getTableId(), flowKey);
173 tx.put(flowInstanceId, flowbld.build(), true);
176 private void writeFlowInternal(BigInteger dpId, Flow flow, WriteTransaction tx) {
177 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
178 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
179 tx.put(LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow, true);
182 private void writeFlowInternal(BigInteger dpId, Flow flow, TypedWriteTransaction<Datastore.Configuration> tx) {
183 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
184 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
185 tx.put(flowInstanceId, flow, true);
188 private void batchedAddFlowInternal(BigInteger dpId, Flow flow) {
189 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
190 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flowKey);
191 flowBatchingUtils.write(flowInstanceId, flow);
194 private void batchedRemoveFlowInternal(BigInteger dpId, Flow flow) {
195 FlowKey flowKey = new FlowKey(new FlowId(flow.getId()));
196 short tableId = flow.getTableId();
197 if (flowExists(dpId, tableId, flowKey)) {
198 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
199 flowBatchingUtils.delete(flowInstanceId);
201 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
206 FluentFuture<Void> installGroupInternal(GroupEntity groupEntity) {
207 return addCallBackForInstallGroupAndReturn(txRunner
208 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
209 tx -> writeGroupEntityInternal(groupEntity, tx)));
212 private void writeGroupEntityInternal(GroupEntity groupEntity, WriteTransaction tx) {
213 Group group = groupEntity.getGroupBuilder().build();
214 Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
215 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
216 tx.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
219 private void writeGroupEntityInternal(GroupEntity groupEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
220 Group group = groupEntity.getGroupBuilder().build();
221 Node nodeDpn = buildDpnNode(groupEntity.getDpnId());
222 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupEntity.getGroupId(), nodeDpn);
223 tx.put(groupInstanceId, group, true);
226 private void writeGroupInternal(BigInteger dpId, Group group, WriteTransaction tx) {
227 Node nodeDpn = buildDpnNode(dpId);
228 long groupId = group.getGroupId().getValue();
229 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
230 tx.put(LogicalDatastoreType.CONFIGURATION, groupInstanceId, group, true);
234 FluentFuture<Void> removeFlowInternal(FlowEntity flowEntity) {
235 return addCallBackForDeleteFlowAndReturnm(txRunner
236 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
237 tx -> deleteFlowEntityInternal(flowEntity, tx)));
240 private void deleteFlowEntityInternal(FlowEntity flowEntity, WriteTransaction tx) {
241 BigInteger dpId = flowEntity.getDpnId();
242 short tableId = flowEntity.getTableId();
243 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
244 deleteFlow(dpId, tableId, flowKey, tx);
247 private void deleteFlowEntityInternal(FlowEntity flowEntity, TypedWriteTransaction<Datastore.Configuration> tx) {
248 BigInteger dpId = flowEntity.getDpnId();
249 short tableId = flowEntity.getTableId();
250 FlowKey flowKey = new FlowKey(new FlowId(flowEntity.getFlowId()));
251 deleteFlow(dpId, tableId, flowKey, tx);
254 private void deleteFlow(BigInteger dpId, short tableId, FlowKey flowKey, WriteTransaction tx) {
255 if (flowExists(dpId, tableId, flowKey)) {
256 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
257 tx.delete(LogicalDatastoreType.CONFIGURATION, flowInstanceId);
259 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
263 private void deleteFlow(BigInteger dpId, short tableId, FlowKey flowKey,
264 TypedWriteTransaction<Datastore.Configuration> tx) {
265 if (flowExists(dpId, tableId, flowKey)) {
266 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
267 tx.delete(flowInstanceId);
269 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
273 private FluentFuture<Void> removeFlowNewInternal(BigInteger dpnId, Flow flowEntity) {
274 LOG.debug("Remove flow {}", flowEntity);
275 return txRunner.callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
277 FlowKey flowKey = new FlowKey(flowEntity.getId());
278 short tableId = flowEntity.getTableId();
279 deleteFlow(dpnId, tableId, flowKey, tx);
283 private void deleteFlowInternal(BigInteger dpId, Flow flow, WriteTransaction tx) {
284 FlowKey flowKey = new FlowKey(flow.getId());
285 short tableId = flow.getTableId();
286 deleteFlow(dpId, tableId, flowKey, tx);
290 FluentFuture<Void> removeGroupInternal(BigInteger dpnId, long groupId) {
291 return addCallBackForInstallGroupAndReturn(txRunner
292 .callWithNewWriteOnlyTransactionAndSubmit(Datastore.CONFIGURATION,
293 tx -> removeGroupInternal(dpnId, groupId, tx)));
296 private void removeGroupInternal(BigInteger dpnId, long groupId, WriteTransaction tx) {
297 Node nodeDpn = buildDpnNode(dpnId);
298 if (groupExists(nodeDpn, groupId)) {
299 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
300 tx.delete(LogicalDatastoreType.CONFIGURATION, groupInstanceId);
302 LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
306 private void removeGroupInternal(BigInteger dpnId, long groupId,
307 TypedWriteTransaction<Datastore.Configuration> tx) {
308 Node nodeDpn = buildDpnNode(dpnId);
309 if (groupExists(nodeDpn, groupId)) {
310 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
311 tx.delete(groupInstanceId);
313 LOG.debug("Group {} does not exist for dpn {}", groupId, dpnId);
318 private void modifyGroupInternal(GroupEntity groupEntity) {
320 installGroup(groupEntity);
323 private void sendPacketOutInternal(BigInteger dpnId, int groupId, byte[] payload) {
325 List<ActionInfo> actionInfos = new ArrayList<>();
326 actionInfos.add(new ActionGroup(groupId));
328 sendPacketOutWithActions(dpnId, payload, actionInfos);
331 private void sendPacketOutWithActionsInternal(BigInteger dpnId, byte[] payload, List<ActionInfo> actionInfos) {
332 ListenableFuture<RpcResult<TransmitPacketOutput>> future = packetProcessingService.transmitPacket(
333 MDSALUtil.getPacketOut(actionInfos, payload, dpnId,
334 getNodeConnRef("openflow:" + dpnId, "0xfffffffd")));
335 JdkFutures.addErrorLogging(future, LOG, "Transmit packet");
338 private void sendARPPacketOutWithActionsInternal(BigInteger dpnId, byte[] payload, List<ActionInfo> actions) {
339 sendPacketOutWithActionsInternal(dpnId, payload, actions);
342 protected InstanceIdentifier<Node> nodeToInstanceId(Node node) {
343 return InstanceIdentifier.builder(Nodes.class).child(Node.class, node.key()).build();
346 private static NodeConnectorRef getNodeConnRef(final String nodeId, final String port) {
347 StringBuilder stringBuilder = new StringBuilder(nodeId);
348 StringBuilder append = stringBuilder.append(":");
349 StringBuilder build = append.append(port);
350 String string = build.toString();
351 NodeConnectorId nodeConnectorId = new NodeConnectorId(string);
352 NodeConnectorKey nodeConnectorKey = new NodeConnectorKey(nodeConnectorId);
353 NodeConnectorKey connectorKey = nodeConnectorKey;
354 InstanceIdentifierBuilder<Nodes> builder = InstanceIdentifier.builder(Nodes.class);
356 NodeKey nodeKey = new NodeKey(new NodeId(nodeId));
357 InstanceIdentifierBuilder<Node> child = builder.child(Node.class, nodeKey);
358 InstanceIdentifierBuilder<NodeConnector> anotherChild = child.child(NodeConnector.class, connectorKey);
359 InstanceIdentifier<NodeConnector> path = anotherChild.build();
360 NodeConnectorRef nodeConnectorRef = new NodeConnectorRef(path);
361 return nodeConnectorRef;
364 private static Node buildDpnNode(BigInteger dpnId) {
365 NodeId nodeId = new NodeId("openflow:" + dpnId);
366 Node nodeDpn = new NodeBuilder().setId(nodeId).withKey(new NodeKey(nodeId)).build();
371 private String getGroupKey(long groupId, BigInteger dpId) {
372 String synchronizingKey = "group-key-" + groupId + dpId;
373 return synchronizingKey.intern();
376 private String getFlowKey(BigInteger dpId, short tableId, FlowKey flowKey) {
377 String synchronizingKey = "flow-key-" + dpId + tableId + flowKey;
378 return synchronizingKey.intern();
381 private void syncSetUpFlowInternal(FlowEntity flowEntity, boolean isRemove) {
382 if (LOG.isTraceEnabled()) {
383 LOG.trace("syncSetUpFlow for flowEntity {} ", flowEntity);
385 Flow flow = flowEntity.getFlowBuilder().build();
386 String flowId = flowEntity.getFlowId();
387 short tableId = flowEntity.getTableId();
388 BigInteger dpId = flowEntity.getDpnId();
389 FlowKey flowKey = new FlowKey(new FlowId(flowId));
390 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
393 synchronized (getFlowKey(dpId, tableId, flowKey)) {
394 if (flowExists(dpId, tableId, flowKey)) {
395 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId);
397 LOG.debug("Flow {} does not exist for dpn {}", flowKey, dpId);
401 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, flowInstanceId, flow);
405 private void syncSetUpGroupInternal(GroupEntity groupEntity, boolean isRemove) {
406 if (LOG.isTraceEnabled()) {
407 LOG.trace("syncSetUpGroup for groupEntity {} ", groupEntity);
409 Group group = groupEntity.getGroupBuilder().build();
410 BigInteger dpId = groupEntity.getDpnId();
411 long groupId = groupEntity.getGroupId();
412 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
415 synchronized (getGroupKey(groupId, dpId)) {
416 if (groupExists(dpId, groupId)) {
417 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
419 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
423 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
427 private void syncSetUpGroupInternal(BigInteger dpId, Group group, boolean isRemove) {
428 LOG.trace("syncSetUpGroup for group {} ", group);
429 long groupId = group.getGroupId().getValue();
430 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, buildDpnNode(dpId));
433 synchronized (getGroupKey(groupId, dpId)) {
434 if (groupExists(dpId, groupId)) {
435 MDSALUtil.syncDelete(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId);
437 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
441 MDSALUtil.syncWrite(dataBroker, LogicalDatastoreType.CONFIGURATION, groupInstanceId, group);
445 private class GroupListener extends AsyncClusteredDataTreeChangeListenerBase<Group, GroupListener> {
448 super(Group.class, GroupListener.class);
452 protected void remove(InstanceIdentifier<Group> identifier, Group del) {
453 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
454 executeNotifyTaskIfRequired(dpId, del);
457 private void executeNotifyTaskIfRequired(BigInteger dpId, Group group) {
458 GroupInfoKey groupKey = new GroupInfoKey(dpId, group.getGroupId().getValue());
459 Runnable notifyTask = groupMap.remove(groupKey);
460 if (notifyTask == null) {
463 executorService.execute(notifyTask);
467 protected void update(InstanceIdentifier<Group> identifier, Group original, Group update) {
468 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
469 executeNotifyTaskIfRequired(dpId, update);
473 protected void add(InstanceIdentifier<Group> identifier, Group add) {
474 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
475 executeNotifyTaskIfRequired(dpId, add);
479 protected InstanceIdentifier<Group> getWildCardPath() {
480 return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
485 protected GroupListener getDataTreeChangeListener() {
486 return GroupListener.this;
490 private class FlowListener extends AsyncClusteredDataTreeChangeListenerBase<Flow, FlowListener> {
493 super(Flow.class, FlowListener.class);
497 protected void remove(InstanceIdentifier<Flow> identifier, Flow del) {
498 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
499 notifyTaskIfRequired(dpId, del);
502 private void notifyTaskIfRequired(BigInteger dpId, Flow flow) {
503 FlowInfoKey flowKey = new FlowInfoKey(dpId, flow.getTableId(), flow.getMatch(), flow.getId().getValue());
504 Runnable notifyTask = flowMap.remove(flowKey);
505 if (notifyTask == null) {
508 executorService.execute(notifyTask);
512 protected void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
516 protected void add(InstanceIdentifier<Flow> identifier, Flow add) {
517 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
518 notifyTaskIfRequired(dpId, add);
522 protected InstanceIdentifier<Flow> getWildCardPath() {
523 return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
524 .child(Table.class).child(Flow.class);
528 protected FlowListener getDataTreeChangeListener() {
529 return FlowListener.this;
533 private class FlowConfigListener extends AsyncClusteredDataTreeChangeListenerBase<Flow, FlowConfigListener> {
534 private final Logger flowLog = LoggerFactory.getLogger(FlowConfigListener.class);
536 FlowConfigListener() {
537 super(Flow.class, FlowConfigListener.class);
541 protected void remove(InstanceIdentifier<Flow> identifier, Flow del) {
542 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
543 flowLog.trace("FlowId {} deleted from Table {} on DPN {}",
544 del.getId().getValue(), del.getTableId(), dpId);
548 protected void update(InstanceIdentifier<Flow> identifier, Flow original, Flow update) {
552 protected void add(InstanceIdentifier<Flow> identifier, Flow add) {
553 BigInteger dpId = getDpnFromString(identifier.firstKeyOf(Node.class).getId().getValue());
554 flowLog.debug("FlowId {} added to Table {} on DPN {}",
555 add.getId().getValue(), add.getTableId(), dpId);
559 protected InstanceIdentifier<Flow> getWildCardPath() {
560 return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
561 .child(Table.class).child(Flow.class);
565 protected FlowConfigListener getDataTreeChangeListener() {
566 return FlowConfigListener.this;
570 private static BigInteger getDpnFromString(String dpnString) {
571 String[] split = dpnString.split(":");
572 return new BigInteger(split[1]);
576 public CheckedFuture<Void, TransactionCommitFailedException> installFlow(FlowEntity flowEntity) {
577 return toChecked(installFlowInternal(flowEntity),
578 t -> new TransactionCommitFailedException("installFlow failed", t));
582 public CheckedFuture<Void, TransactionCommitFailedException> installFlow(BigInteger dpId, Flow flowEntity) {
583 return toChecked(installFlowInternal(dpId, flowEntity),
584 t -> new TransactionCommitFailedException("installFlow failed", t));
588 public CheckedFuture<Void, TransactionCommitFailedException> installFlow(BigInteger dpId, FlowEntity flowEntity) {
589 return toChecked(installFlowInternal(dpId, flowEntity.getFlowBuilder().build()),
590 t -> new TransactionCommitFailedException("installFlow failed", t));
594 public ListenableFuture<Void> removeFlow(BigInteger dpId, short tableId, FlowId flowId) {
595 ListenableFuture<Void> future = txRunner.callWithNewWriteOnlyTransactionAndSubmit(
596 tx -> deleteFlow(dpId, tableId, new FlowKey(flowId), tx));
598 Futures.addCallback(future, new FutureCallback<Void>() {
600 public void onSuccess(final Void result) {
601 // Committed successfully
602 LOG.debug("Delete Flow -- Committed successfully");
606 public void onFailure(final Throwable throwable) {
607 // Transaction failed
608 if (throwable instanceof OptimisticLockFailedException) {
609 // Failed because of concurrent transaction modifying same
611 LOG.error("Delete Flow -- Failed because of concurrent transaction modifying same data");
613 // Some other type of TransactionCommitFailedException
614 LOG.error("Delete Flow -- Some other type of TransactionCommitFailedException", throwable);
618 }, MoreExecutors.directExecutor());
624 public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(BigInteger dpId, Flow flowEntity) {
625 return toChecked(removeFlowNewInternal(dpId, flowEntity),
626 t -> new TransactionCommitFailedException("removeFlow failed", t));
630 public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(BigInteger dpId, FlowEntity flowEntity) {
631 return toChecked(removeFlowNewInternal(dpId, flowEntity.getFlowBuilder().build()),
632 t -> new TransactionCommitFailedException("removeFlow failed", t));
636 public CheckedFuture<Void, TransactionCommitFailedException> removeFlow(FlowEntity flowEntity) {
637 return toChecked(removeFlowInternal(flowEntity),
638 t -> new TransactionCommitFailedException("removeFlow failed", t));
642 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, FlowEntity flowEntity)
643 throws ExecutionException, InterruptedException {
644 removeFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowId(), flowEntity.getTableId());
648 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, Flow flow)
649 throws ExecutionException, InterruptedException {
650 removeFlow(tx, dpId, flow.key(), flow.getTableId());
654 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, String flowId, short tableId)
655 throws ExecutionException, InterruptedException {
656 removeFlow(tx, dpId, new FlowKey(new FlowId(flowId)), tableId);
660 public void removeFlow(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, FlowKey flowKey,
661 short tableId) throws ExecutionException, InterruptedException {
662 InstanceIdentifier<Flow> flowInstanceIdentifier = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
663 if (tx.read(flowInstanceIdentifier).get().isPresent()) {
664 tx.delete(flowInstanceIdentifier);
669 public void installGroup(GroupEntity groupEntity) {
670 installGroupInternal(groupEntity);
674 public void modifyGroup(GroupEntity groupEntity) {
675 modifyGroupInternal(groupEntity);
679 public void removeGroup(GroupEntity groupEntity) {
680 removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId());
684 public void removeGroup(BigInteger dpnId, long groupId) {
685 removeGroupInternal(dpnId, groupId);
689 public void removeGroup(BigInteger dpnId, long groupId, WriteTransaction tx) {
690 removeGroupInternal(dpnId, groupId, tx);
694 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, GroupEntity groupEntity)
695 throws ExecutionException, InterruptedException {
696 removeGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupId());
700 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, Group group)
701 throws ExecutionException, InterruptedException {
702 removeGroup(tx, dpId, group.getGroupId().getValue());
706 public void removeGroup(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, long groupId)
707 throws ExecutionException, InterruptedException {
708 Node nodeDpn = buildDpnNode(dpId);
709 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
710 if (tx.read(groupInstanceId).get().isPresent()) {
711 tx.delete(groupInstanceId);
713 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
718 public void sendPacketOut(BigInteger dpnId, int groupId, byte[] payload) {
719 sendPacketOutInternal(dpnId, groupId, payload);
723 public void sendPacketOutWithActions(BigInteger dpnId, long groupId, byte[] payload, List<ActionInfo> actionInfos) {
724 sendPacketOutWithActionsInternal(dpnId, payload, actionInfos);
728 public void sendPacketOutWithActions(BigInteger dpnId, byte[] payload, List<ActionInfo> actionInfos) {
729 sendPacketOutWithActionsInternal(dpnId, payload, actionInfos);
733 public void sendARPPacketOutWithActions(BigInteger dpnId, byte[] payload, List<ActionInfo> actionInfo) {
734 sendARPPacketOutWithActionsInternal(dpnId, payload, actionInfo);
738 public void syncRemoveFlow(FlowEntity flowEntity, long delayTime) {
739 syncSetUpFlowInternal(flowEntity, true);
743 public void syncRemoveFlow(FlowEntity flowEntity) {
744 syncSetUpFlowInternal(flowEntity, true);
748 public void syncInstallFlow(FlowEntity flowEntity, long delayTime) {
749 syncSetUpFlowInternal(flowEntity, false);
753 public void syncInstallFlow(FlowEntity flowEntity) {
754 syncSetUpFlowInternal(flowEntity, false);
758 public void syncInstallGroup(GroupEntity groupEntity, long delayTime) {
759 syncSetUpGroupInternal(groupEntity, false);
763 public void syncInstallGroup(GroupEntity groupEntity) {
764 syncSetUpGroupInternal(groupEntity, false);
768 public void syncInstallGroup(BigInteger dpId, Group group, long delayTime) {
769 syncSetUpGroupInternal(dpId, group, false);
773 public void syncInstallGroup(BigInteger dpId, Group group) {
774 syncSetUpGroupInternal(dpId, group, false);
778 public void syncRemoveGroup(GroupEntity groupEntity) {
779 syncSetUpGroupInternal(groupEntity, true);
783 public void syncRemoveGroup(BigInteger dpId, Group group) {
784 syncSetUpGroupInternal(dpId, group, true);
788 public void addFlowToTx(FlowEntity flowEntity, WriteTransaction tx) {
789 writeFlowEntityInternal(flowEntity, tx);
793 public void addFlowToTx(BigInteger dpId, Flow flow, WriteTransaction tx) {
794 writeFlowInternal(dpId, flow, tx);
798 public void addFlow(TypedWriteTransaction<Configuration> tx, FlowEntity flowEntity) {
799 addFlow(tx, flowEntity.getDpnId(), flowEntity.getFlowBuilder().build());
803 public void addFlow(TypedWriteTransaction<Configuration> tx, BigInteger dpId, Flow flow) {
804 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, flow.getTableId(), flow.key());
805 tx.put(flowInstanceId, flow, CREATE_MISSING_PARENTS);
809 public void removeFlowToTx(BigInteger dpId, Flow flow, WriteTransaction tx) {
810 deleteFlowInternal(dpId, flow, tx);
814 public void removeFlowToTx(FlowEntity flowEntity, WriteTransaction tx) {
815 deleteFlowEntityInternal(flowEntity, tx);
819 public void addGroupToTx(GroupEntity groupEntity, WriteTransaction tx) {
820 writeGroupEntityInternal(groupEntity, tx);
824 public void addGroupToTx(BigInteger dpId, Group group, WriteTransaction tx) {
825 writeGroupInternal(dpId, group, tx);
829 public void addGroup(TypedWriteTransaction<Configuration> tx, GroupEntity groupEntity) {
830 addGroup(tx, groupEntity.getDpnId(), groupEntity.getGroupBuilder().build());
834 public void addGroup(TypedWriteTransaction<Configuration> tx, BigInteger dpId, Group group) {
835 Node nodeDpn = buildDpnNode(dpId);
836 long groupId = group.getGroupId().getValue();
837 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
838 tx.put(groupInstanceId, group, CREATE_MISSING_PARENTS);
842 public void removeGroupToTx(GroupEntity groupEntity, WriteTransaction tx) {
843 removeGroupInternal(groupEntity.getDpnId(), groupEntity.getGroupId(), tx);
847 public void removeGroupToTx(BigInteger dpId, Group group, WriteTransaction tx) {
848 removeGroupInternal(dpId, group.getGroupId().getValue(), tx);
852 public void batchedAddFlow(BigInteger dpId, FlowEntity flowEntity) {
853 batchedAddFlowInternal(dpId, flowEntity.getFlowBuilder().build());
857 public void batchedAddFlow(BigInteger dpId, Flow flow) {
858 batchedAddFlowInternal(dpId, flow);
862 public void batchedRemoveFlow(BigInteger dpId, FlowEntity flowEntity) {
863 batchedRemoveFlowInternal(dpId, flowEntity.getFlowBuilder().build());
867 public void batchedRemoveFlow(BigInteger dpId, Flow flow) {
868 batchedRemoveFlowInternal(dpId, flow);
872 public void addBucketToTx(BigInteger dpId, long groupId, Bucket bucket, WriteTransaction tx) {
873 addBucket(dpId, groupId, bucket, tx);
877 public void addBucket(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, long groupId, Bucket bucket)
878 throws ExecutionException, InterruptedException {
879 Node nodeDpn = buildDpnNode(dpId);
880 if (groupExists(tx, nodeDpn, groupId)) {
881 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
882 bucket.getBucketId().getValue(), nodeDpn);
883 tx.put(bucketInstanceId, bucket);
887 private void addBucket(BigInteger dpId, long groupId, Bucket bucket, WriteTransaction tx) {
888 Node nodeDpn = buildDpnNode(dpId);
889 if (groupExists(nodeDpn, groupId)) {
890 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId,
891 bucket.getBucketId().getValue(), nodeDpn);
892 tx.put(LogicalDatastoreType.CONFIGURATION, bucketInstanceId, bucket);
897 public void removeBucketToTx(BigInteger dpId, long groupId, long bucketId, WriteTransaction tx) {
898 deleteBucket(dpId, groupId, bucketId, tx);
902 public void removeBucket(TypedReadWriteTransaction<Configuration> tx, BigInteger dpId, long groupId, long bucketId)
903 throws ExecutionException, InterruptedException {
904 Node nodeDpn = buildDpnNode(dpId);
905 if (groupExists(tx, nodeDpn, groupId)) {
906 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
907 tx.delete(bucketInstanceId);
909 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
913 private void deleteBucket(BigInteger dpId, long groupId, long bucketId, WriteTransaction tx) {
914 Node nodeDpn = buildDpnNode(dpId);
915 if (groupExists(nodeDpn, groupId)) {
916 InstanceIdentifier<Bucket> bucketInstanceId = buildBucketInstanceIdentifier(groupId, bucketId, nodeDpn);
917 tx.delete(LogicalDatastoreType.CONFIGURATION, bucketInstanceId);
919 LOG.debug("Group {} does not exist for dpn {}", groupId, dpId);
924 public boolean groupExists(BigInteger dpId, long groupId) {
925 return groupExists(buildDpnNode(dpId), groupId);
928 private boolean groupExists(Node nodeDpn, long groupId) {
929 InstanceIdentifier<Group> groupInstanceId = buildGroupInstanceIdentifier(groupId, nodeDpn);
931 return singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION, groupInstanceId).isPresent();
932 } catch (ReadFailedException e) {
933 LOG.warn("Exception while reading group {} for Node {}", groupId, nodeDpn.key());
938 private boolean groupExists(TypedReadTransaction<Configuration> tx, Node nodeDpn, long groupId)
939 throws ExecutionException, InterruptedException {
940 return tx.read(buildGroupInstanceIdentifier(groupId, nodeDpn)).get().isPresent();
943 private InstanceIdentifier<Group> buildGroupInstanceIdentifier(long groupId, Node nodeDpn) {
944 InstanceIdentifier<Group> groupInstanceId = InstanceIdentifier.builder(Nodes.class)
945 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
946 .child(Group.class, new GroupKey(new GroupId(groupId))).build();
947 return groupInstanceId;
950 private boolean flowExists(BigInteger dpId, short tableId, FlowKey flowKey) {
951 InstanceIdentifier<Flow> flowInstanceId = buildFlowInstanceIdentifier(dpId, tableId, flowKey);
953 Optional<Flow> flowOptional = singleTxDb.syncReadOptional(LogicalDatastoreType.CONFIGURATION,
955 return flowOptional.isPresent();
956 } catch (ReadFailedException e) {
957 LOG.warn("Exception while reading flow {} for dpn {}", flowKey, dpId);
962 private static InstanceIdentifier<Flow> buildFlowInstanceIdentifier(BigInteger dpnId, short tableId,
964 InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
965 .child(Node.class, buildDpnNode(dpnId).key()).augmentation(FlowCapableNode.class)
966 .child(Table.class, new TableKey(tableId)).child(Flow.class, flowKey).build();
967 return flowInstanceId;
970 private InstanceIdentifier<Bucket> buildBucketInstanceIdentifier(long groupId, long bucketId,
972 InstanceIdentifier<Bucket> bucketInstanceId = InstanceIdentifier.builder(Nodes.class)
973 .child(Node.class, nodeDpn.key()).augmentation(FlowCapableNode.class)
974 .child(Group.class, new GroupKey(new GroupId(groupId)))
975 .child(Buckets.class)
976 .child(Bucket.class, new BucketKey(new BucketId(bucketId))).build();
977 return bucketInstanceId;
980 private FluentFuture<Void> addCallBackForDeleteFlowAndReturnm(FluentFuture<Void> fluentFuture) {
981 return callBack(fluentFuture, "Delete Flow");
984 private FluentFuture<Void> addCallBackForInstallFlowAndReturn(FluentFuture<Void> fluentFuture) {
985 return callBack(fluentFuture, "Install Flow");
988 private FluentFuture<Void> addCallBackForInstallGroupAndReturn(FluentFuture<Void> fluentFuture) {
989 return callBack(fluentFuture, "Install Group");
992 // Generic for handling callbacks
993 private FluentFuture<Void> callBack(FluentFuture<Void> fluentFuture, String log) {
994 fluentFuture.addCallback(new FutureCallback<Void>() {
996 public void onSuccess(final Void result) {
997 // Committed successfully
998 LOG.debug("{} -- Committedsuccessfully ", log);
1002 public void onFailure(final Throwable throwable) {
1003 // Transaction failed
1005 if (throwable instanceof OptimisticLockFailedException) {
1006 // Failed because of concurrent transaction modifying same
1008 LOG.error("{} -- Failed because of concurrent transaction modifying same data", log);
1010 // Some other type of TransactionCommitFailedException
1011 LOG.error("{} -- Some other type of TransactionCommitFailedException",log, throwable);
1014 }, MoreExecutors.directExecutor());
1015 return fluentFuture;