2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.frm.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.List;
21 import java.util.ListIterator;
24 import java.util.concurrent.Callable;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
34 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
35 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.MeterKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.StaleMeter;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.StaleMeterKey;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.DataObject;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
73 * forwardingrules-manager
74 * org.opendaylight.openflowplugin.applications.frm
76 * FlowNode Reconciliation Listener
77 * Reconciliation for a new FlowNode
79 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
81 * Created: Jun 13, 2014
83 public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
85 private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
87 private final DataBroker dataBroker;
89 private final ForwardingRulesManager provider;
91 private ListenerRegistration<DataChangeListener> listenerRegistration;
93 public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) {
94 this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
95 dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
97 final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
98 .child(Node.class).augmentation(FlowCapableNode.class);
100 SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
101 ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
103 listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<DataChangeListener>>() {
105 public ListenerRegistration<DataChangeListener> call() throws Exception {
106 return db.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
107 flowNodeWildCardIdentifier, FlowNodeReconciliationImpl.this, DataChangeScope.BASE);
110 } catch (Exception e) {
111 LOG.warn("data listener registration failed: {}", e.getMessage());
112 LOG.debug("data listener registration failed.. ", e);
113 throw new IllegalStateException("FlowNodeReconciliation startup fail! System needs restart.", e);
118 public void close() {
119 if (listenerRegistration != null) {
121 listenerRegistration.close();
122 } catch (Exception e) {
123 LOG.warn("Error by stop FRM FlowNodeReconilListener: {}", e.getMessage());
124 LOG.debug("Error by stop FRM FlowNodeReconilListener..", e);
126 listenerRegistration = null;
131 public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
132 Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!");
133 /* All DataObjects for create */
134 final Set<InstanceIdentifier<?>> createdData = changeEvent.getCreatedData() != null
135 ? changeEvent.getCreatedData().keySet() : Collections.<InstanceIdentifier<?>> emptySet();
136 /* All DataObjects for remove */
137 final Set<InstanceIdentifier<?>> removeData = changeEvent.getRemovedPaths() != null
138 ? changeEvent.getRemovedPaths() : Collections.<InstanceIdentifier<?>> emptySet();
139 /* All updated DataObjects */
140 final Map<InstanceIdentifier<?>, DataObject> updateData = changeEvent.getUpdatedData() != null
141 ? changeEvent.getUpdatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
143 for (InstanceIdentifier<?> entryKey : removeData) {
144 final InstanceIdentifier<FlowCapableNode> nodeIdent = entryKey
145 .firstIdentifierOf(FlowCapableNode.class);
146 if ( ! nodeIdent.isWildcarded()) {
147 flowNodeDisconnected(nodeIdent);
150 for (InstanceIdentifier<?> entryKey : createdData) {
151 final InstanceIdentifier<FlowCapableNode> nodeIdent = entryKey
152 .firstIdentifierOf(FlowCapableNode.class);
153 if ( ! nodeIdent.isWildcarded()) {
154 flowNodeConnected(nodeIdent);
158 // FIXME: just a hack to cover DS/operational dirty start
159 // if all conventional ways failed and there is update
160 if (removeData.isEmpty() && createdData.isEmpty() && updateData.size() == 1) {
161 for (Map.Entry<InstanceIdentifier<?>, DataObject> entry : updateData.entrySet()) {
162 // and only if this update covers top element (flow-capable-node)
163 if (FlowCapableNode.class.equals(entry.getKey().getTargetType())) {
164 final InstanceIdentifier<FlowCapableNode> nodeIdent = entry.getKey()
165 .firstIdentifierOf(FlowCapableNode.class);
166 if (!nodeIdent.isWildcarded()) {
167 // then force registration to local node cache and reconcile
168 flowNodeConnected(nodeIdent, true);
176 public void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode) {
177 provider.unregistrateNode(disconnectedNode);
181 public void flowNodeConnected(InstanceIdentifier<FlowCapableNode> connectedNode) {
182 flowNodeConnected(connectedNode, false);
185 private void flowNodeConnected(InstanceIdentifier<FlowCapableNode> connectedNode, boolean force) {
186 if (force || !provider.isNodeActive(connectedNode)) {
187 provider.registrateNewNode(connectedNode);
189 if(!provider.isNodeOwner(connectedNode)) { return; }
191 if (provider.getConfiguration().isStaleMarkingEnabled()) {
192 LOG.info("Stale-Marking is ENABLED and proceeding with deletion of stale-marked entities on switch {}",
193 connectedNode.toString());
194 reconciliationPreProcess(connectedNode);
196 reconciliation(connectedNode);
200 private void reconciliation(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
202 ReadOnlyTransaction trans = provider.getReadTranaction();
203 Optional<FlowCapableNode> flowNode = Optional.absent();
206 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get();
208 catch (Exception e) {
209 LOG.error("Fail with read Config/DS for Node {} !", nodeIdent, e);
212 if (flowNode.isPresent()) {
213 /* Tables - have to be pushed before groups */
214 // CHECK if while pusing the update, updateTableInput can be null to emulate a table add
215 List<Table> tableList = flowNode.get().getTable() != null
216 ? flowNode.get().getTable() : Collections.<Table> emptyList() ;
217 for (Table table : tableList) {
218 TableKey tableKey = table.getKey();
219 KeyedInstanceIdentifier<TableFeatures, TableFeaturesKey> tableFeaturesII
220 = nodeIdent.child(Table.class, tableKey).child(TableFeatures.class, new TableFeaturesKey(tableKey.getId()));
221 List<TableFeatures> tableFeatures = table.getTableFeatures();
222 if (tableFeatures != null) {
223 for (TableFeatures tableFeaturesItem : tableFeatures) {
224 provider.getTableFeaturesCommiter().update(tableFeaturesII, tableFeaturesItem, null, nodeIdent);
229 /* Groups - have to be first */
230 List<Group> groups = flowNode.get().getGroup() != null
231 ? flowNode.get().getGroup() : Collections.<Group>emptyList();
232 List<Group> toBeInstalledGroups = new ArrayList<>();
233 toBeInstalledGroups.addAll(groups);
234 List<Long> alreadyInstalledGroupids = new ArrayList<>();
236 while (!toBeInstalledGroups.isEmpty()) {
237 ListIterator<Group> iterator = toBeInstalledGroups.listIterator();
238 while (iterator.hasNext()) {
239 Group group = iterator.next();
240 boolean okToInstall = true;
241 for (Bucket bucket : group.getBuckets().getBucket()) {
242 for (Action action : bucket.getAction()) {
243 if (action.getAction().getImplementedInterface().getName()
244 .equals("org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase")) {
245 Long groupId = ((GroupActionCase) (action.getAction())).getGroupAction().getGroupId();
246 if (!alreadyInstalledGroupids.contains(groupId)) {
259 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
260 nodeIdent.child(Group.class, group.getKey());
261 this.provider.getGroupCommiter().add(groupIdent, group, nodeIdent);
262 alreadyInstalledGroupids.add(group.getGroupId().getValue());
268 List<Meter> meters = flowNode.get().getMeter() != null
269 ? flowNode.get().getMeter() : Collections.<Meter> emptyList();
270 for (Meter meter : meters) {
271 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
272 nodeIdent.child(Meter.class, meter.getKey());
273 this.provider.getMeterCommiter().add(meterIdent, meter, nodeIdent);
276 List<Table> tables = flowNode.get().getTable() != null
277 ? flowNode.get().getTable() : Collections.<Table> emptyList();
278 for (Table table : tables) {
279 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
280 nodeIdent.child(Table.class, table.getKey());
281 List<Flow> flows = table.getFlow() != null ? table.getFlow() : Collections.<Flow> emptyList();
282 for (Flow flow : flows) {
283 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
284 tableIdent.child(Flow.class, flow.getKey());
285 this.provider.getFlowCommiter().add(flowIdent, flow, nodeIdent);
289 /* clean transaction */
295 private void reconciliationPreProcess(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
298 List<InstanceIdentifier<StaleFlow>> staleFlowsToBeBulkDeleted = Lists.newArrayList();
299 List<InstanceIdentifier<StaleGroup>> staleGroupsToBeBulkDeleted = Lists.newArrayList();
300 List<InstanceIdentifier<StaleMeter>> staleMetersToBeBulkDeleted = Lists.newArrayList();
303 ReadOnlyTransaction trans = provider.getReadTranaction();
304 Optional<FlowCapableNode> flowNode = Optional.absent();
307 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdent).get();
309 catch (Exception e) {
310 LOG.error("Reconciliation Pre-Processing Fail with read Config/DS for Node {} !", nodeIdent, e);
313 if (flowNode.isPresent()) {
315 LOG.debug("Proceeding with deletion of stale-marked Flows on switch {} using Openflow interface",
316 nodeIdent.toString());
317 /* Stale-Flows - Stale-marked Flows have to be removed first for safety */
318 List<Table> tables = flowNode.get().getTable() != null
319 ? flowNode.get().getTable() : Collections.<Table> emptyList();
320 for (Table table : tables) {
321 final KeyedInstanceIdentifier<Table, TableKey> tableIdent =
322 nodeIdent.child(Table.class, table.getKey());
323 List<StaleFlow> staleFlows = table.getStaleFlow() != null ? table.getStaleFlow() : Collections.<StaleFlow> emptyList();
324 for (StaleFlow staleFlow : staleFlows) {
326 FlowBuilder flowBuilder = new FlowBuilder(staleFlow);
327 Flow toBeDeletedFlow = flowBuilder.setId(staleFlow.getId()).build();
329 final KeyedInstanceIdentifier<Flow, FlowKey> flowIdent =
330 tableIdent.child(Flow.class, toBeDeletedFlow.getKey());
333 this.provider.getFlowCommiter().remove(flowIdent, toBeDeletedFlow, nodeIdent);
335 staleFlowsToBeBulkDeleted.add(getStaleFlowInstanceIdentifier(staleFlow, nodeIdent));
340 LOG.debug("Proceeding with deletion of stale-marked Groups for switch {} using Openflow interface",
341 nodeIdent.toString());
343 // TODO: Should we collate the futures of RPC-calls to be sure that groups are Flows are fully deleted
344 // before attempting to delete groups - just in case there are references
346 /* Stale-marked Groups - Can be deleted after flows */
347 List<StaleGroup> staleGroups = flowNode.get().getStaleGroup() != null
348 ? flowNode.get().getStaleGroup() : Collections.<StaleGroup> emptyList();
349 for (StaleGroup staleGroup : staleGroups) {
351 GroupBuilder groupBuilder = new GroupBuilder(staleGroup);
352 Group toBeDeletedGroup = groupBuilder.setGroupId(staleGroup.getGroupId()).build();
354 final KeyedInstanceIdentifier<Group, GroupKey> groupIdent =
355 nodeIdent.child(Group.class, toBeDeletedGroup.getKey());
357 this.provider.getGroupCommiter().add(groupIdent, toBeDeletedGroup, nodeIdent);
359 staleGroupsToBeBulkDeleted.add(getStaleGroupInstanceIdentifier(staleGroup, nodeIdent));
362 LOG.debug("Proceeding with deletion of stale-marked Meters for switch {} using Openflow interface",
363 nodeIdent.toString());
364 /* Stale-marked Meters - can be deleted anytime - so least priority */
365 List<StaleMeter> staleMeters = flowNode.get().getStaleMeter() != null
366 ? flowNode.get().getStaleMeter() : Collections.<StaleMeter> emptyList();
368 for (StaleMeter staleMeter : staleMeters) {
370 MeterBuilder meterBuilder = new MeterBuilder(staleMeter);
371 Meter toBeDeletedMeter = meterBuilder.setMeterId(staleMeter.getMeterId()).build();
373 final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
374 nodeIdent.child(Meter.class, toBeDeletedMeter.getKey());
377 this.provider.getMeterCommiter().add(meterIdent, toBeDeletedMeter, nodeIdent);
379 staleMetersToBeBulkDeleted.add(getStaleMeterInstanceIdentifier(staleMeter, nodeIdent));
383 /* clean transaction */
386 LOG.debug("Deleting all stale-marked flows/groups/meters of for switch {} in Configuration DS",
387 nodeIdent.toString());
388 // Now, do the bulk deletions
389 deleteDSStaleFlows(staleFlowsToBeBulkDeleted);
390 deleteDSStaleGroups(staleGroupsToBeBulkDeleted);
391 deleteDSStaleMeters(staleMetersToBeBulkDeleted);
396 private void deleteDSStaleFlows(List<InstanceIdentifier<StaleFlow>> flowsForBulkDelete){
397 ImmutableList.Builder<InstanceIdentifier<StaleFlow>> builder = ImmutableList.builder();
398 ImmutableList<InstanceIdentifier<StaleFlow>> bulkDelFlows = builder.addAll(flowsForBulkDelete.iterator()).build();
400 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
402 for (InstanceIdentifier<StaleFlow> staleFlowIId : flowsForBulkDelete){
403 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleFlowIId);
406 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
407 handleStaleEntityDeletionResultFuture(submitFuture);
410 private void deleteDSStaleGroups(List<InstanceIdentifier<StaleGroup>> groupsForBulkDelete){
411 ImmutableList.Builder<InstanceIdentifier<StaleGroup>> builder = ImmutableList.builder();
412 ImmutableList<InstanceIdentifier<StaleGroup>> bulkDelGroups = builder.addAll(groupsForBulkDelete.iterator()).build();
414 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
416 for (InstanceIdentifier<StaleGroup> staleGroupIId : groupsForBulkDelete){
417 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleGroupIId);
420 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
421 handleStaleEntityDeletionResultFuture(submitFuture);
425 private void deleteDSStaleMeters(List<InstanceIdentifier<StaleMeter>> metersForBulkDelete){
426 ImmutableList.Builder<InstanceIdentifier<StaleMeter>> builder = ImmutableList.builder();
427 ImmutableList<InstanceIdentifier<StaleMeter>> bulkDelGroups = builder.addAll(metersForBulkDelete.iterator()).build();
429 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
431 for (InstanceIdentifier<StaleMeter> staleMeterIId : metersForBulkDelete){
432 writeTransaction.delete(LogicalDatastoreType.CONFIGURATION, staleMeterIId);
435 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTransaction.submit();
436 handleStaleEntityDeletionResultFuture(submitFuture);
442 private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
444 .child(Table.class, new TableKey(staleFlow.getTableId()))
445 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
446 new StaleFlowKey(new FlowId(staleFlow.getId())));
449 private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup> getStaleGroupInstanceIdentifier(StaleGroup staleGroup, InstanceIdentifier<FlowCapableNode> nodeIdent) {
451 .child(StaleGroup.class, new StaleGroupKey(new GroupId(staleGroup.getGroupId())));
455 private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.StaleMeter> getStaleMeterInstanceIdentifier(StaleMeter staleMeter, InstanceIdentifier<FlowCapableNode> nodeIdent) {
457 .child(StaleMeter.class, new StaleMeterKey(new MeterId(staleMeter.getMeterId())));
461 private void handleStaleEntityDeletionResultFuture(CheckedFuture<Void, TransactionCommitFailedException> submitFuture) {
462 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
464 public void onSuccess(Void result) { LOG.debug("Stale entity removal success");
468 public void onFailure(Throwable t) {
469 LOG.error("Stale entity removal failed {}", t);