2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
47 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
48 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
49 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
52 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
65 import com.google.common.base.Equivalence;
66 import com.google.common.base.Function;
67 import com.google.common.base.Optional;
68 import com.google.common.base.Preconditions;
69 import com.google.common.collect.Collections2;
70 import com.google.common.collect.ImmutableList;
71 import com.google.common.collect.Sets;
72 import com.google.common.util.concurrent.AsyncFunction;
73 import com.google.common.util.concurrent.CheckedFuture;
74 import com.google.common.util.concurrent.FutureCallback;
75 import com.google.common.util.concurrent.Futures;
76 import com.google.common.util.concurrent.ListenableFuture;
79 * Manage policies on switches by subscribing to updates from the
80 * policy resolver and information about endpoints from the endpoint
83 public class PolicyManager
84 implements SwitchListener, PolicyListener, EndpointListener {
85 private static final Logger LOG =
86 LoggerFactory.getLogger(PolicyManager.class);
88 private short tableOffset;
89 private static final short TABLEID_PORTSECURITY = 0;
90 private static final short TABLEID_INGRESS_NAT = 1;
91 private static final short TABLEID_SOURCE_MAPPER = 2;
92 private static final short TABLEID_DESTINATION_MAPPER = 3;
93 private static final short TABLEID_POLICY_ENFORCER = 4;
94 private static final short TABLEID_EGRESS_NAT = 5;
95 private static final short TABLEID_EXTERNAL_MAPPER = 6;
97 private final SwitchManager switchManager;
98 private final PolicyResolver policyResolver;
100 private final PolicyScope policyScope;
102 private final ScheduledExecutorService executor;
103 private final SingletonTask flowUpdateTask;
104 private final DataBroker dataBroker;
105 private final OfContext ofCtx;
107 * The flow tables that make up the processing pipeline
109 private List<? extends OfTable> flowPipeline;
112 * The delay before triggering the flow update task in response to an
113 * event in milliseconds.
115 private final static int FLOW_UPDATE_DELAY = 250;
117 public PolicyManager(DataBroker dataBroker,
118 PolicyResolver policyResolver,
119 SwitchManager switchManager,
120 EndpointManager endpointManager,
121 RpcProviderRegistry rpcRegistry,
122 ScheduledExecutorService executor,
125 this.switchManager = switchManager;
126 this.executor = executor;
127 this.policyResolver = policyResolver;
128 this.dataBroker = dataBroker;
129 this.tableOffset = tableOffset;
131 // to validate against model
132 verifyMaxTableId(tableOffset);
133 } catch (IllegalArgumentException e) {
134 throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
135 + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
138 if (dataBroker != null) {
139 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
140 t.put(LogicalDatastoreType.OPERATIONAL,
142 .builder(SubjectFeatureDefinitions.class)
144 SubjectFeatures.OF_OVERLAY_FEATURES);
148 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
149 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
152 ofCtx = new OfContext(dataBroker, rpcRegistry,
153 this, policyResolver, switchManager,
154 endpointManager, executor);
156 flowPipeline = createFlowPipeline();
158 policyScope = policyResolver.registerListener(this);
159 if (switchManager != null)
160 switchManager.registerListener(this);
161 endpointManager.registerListener(this);
163 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
166 LOG.debug("Initialized OFOverlay policy manager");
169 private List<? extends OfTable> createFlowPipeline() {
170 // TODO - PORTSECURITY is kept in table 0.
171 // According to openflow spec,processing on vSwitch always starts from table 0.
172 // Packets will be droped if table 0 is empty.
173 // Alternative workaround - table-miss flow entries in table 0.
174 return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
175 new GroupTable(ofCtx),
176 new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
177 new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
178 new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
179 new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
180 new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
181 new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
186 * @param tableOffset - new offset value
187 * @return ListenableFuture<List> - to indicate that tables have been synced
189 public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
191 verifyMaxTableId(tableOffset);
192 } catch (IllegalArgumentException e) {
193 LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
194 // TODO - invalid offset value remains in conf DS
195 // It's not possible to validate offset value by using constrains in model,
196 // because number of tables in pipeline varies.
197 return Futures.immediateFuture(null);
199 List<Short> tableIDs = getTableIDs();
200 this.tableOffset = tableOffset;
201 return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
204 public Void apply(Void tablesRemoved) {
205 flowPipeline = createFlowPipeline();
213 * @param tableIDs - IDs of tables to delete
214 * @return ListenableFuture<Void> - which will be filled when clearing is done
216 private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
217 List<ListenableFuture<Void>> checkList = new ArrayList<>();
218 final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
219 for (Short tableId : tableIDs) {
220 for (NodeId nodeId : switchManager.getReadySwitches()) {
221 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
222 checkList.add(deteleTableIfExists(rwTx, tablePath));
225 ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
226 return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
229 public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
230 return rwTx.submit();
235 private List<Short> getTableIDs() {
236 List<Short> tableIds = new ArrayList<>();
237 tableIds.add(getTABLEID_PORTSECURITY());
238 tableIds.add(getTABLEID_INGRESS_NAT());
239 tableIds.add(getTABLEID_SOURCE_MAPPER());
240 tableIds.add(getTABLEID_DESTINATION_MAPPER());
241 tableIds.add(getTABLEID_POLICY_ENFORCER());
242 tableIds.add(getTABLEID_EGRESS_NAT());
243 tableIds.add(getTABLEID_EXTERNAL_MAPPER());
247 private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
248 return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
251 public Void apply(Optional<Table> optTable) {
252 if(optTable.isPresent()){
253 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
263 public short getTABLEID_PORTSECURITY() {
264 return (short)(tableOffset+TABLEID_PORTSECURITY);
268 public short getTABLEID_INGRESS_NAT() {
269 return (short)(tableOffset+TABLEID_INGRESS_NAT);
273 public short getTABLEID_SOURCE_MAPPER() {
274 return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
278 public short getTABLEID_DESTINATION_MAPPER() {
279 return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
283 public short getTABLEID_POLICY_ENFORCER() {
284 return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
288 public short getTABLEID_EGRESS_NAT() {
289 return (short)(tableOffset+TABLEID_EGRESS_NAT);
293 public short getTABLEID_EXTERNAL_MAPPER() {
294 return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
298 public TableId verifyMaxTableId(short tableOffset) {
299 return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
303 public void switchReady(final NodeId nodeId) {
308 public void switchRemoved(NodeId sw) {
309 // XXX TODO purge switch flows
314 public void switchUpdated(NodeId sw) {
323 public void endpointUpdated(EpKey epKey) {
328 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
333 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
334 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
343 public void policyUpdated(Set<EgKey> updatedConsumers) {
352 * Set the learning mode to the specified value
353 * @param learningMode the learning mode to set
355 public void setLearningMode(LearningMode learningMode) {
363 public class FlowMap{
364 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
369 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
370 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
371 if(this.flowMap.get(tableIid) == null) {
372 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
373 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
375 return this.flowMap.get(tableIid);
378 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
379 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
380 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
381 List<Flow> flows = tableBuilder.getFlow();
382 Set<Equivalence.Wrapper<Flow>> wrappedFlows =
383 new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
385 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
387 if (!wrappedFlows.contains(wFlow)) {
388 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
390 LOG.debug("Flow already exists in FlowMap - {}", flow);
394 public void commitToDataStore() {
395 if (dataBroker != null) {
396 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
399 * Get the currently configured flows for
402 updateFlowTable(entry);
403 } catch (Exception e) {
404 LOG.warn("Couldn't read flow table {}", entry.getKey());
410 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
411 TableBuilder> entry) throws Exception {
413 Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
414 // flows currently in the table
415 Set<Flow> curr = new HashSet<>();
417 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
419 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
422 Table currentTable = r.get();
423 curr = new HashSet<>(currentTable.getFlow());
426 // Sets with custom equivalence rules
427 Set<Equivalence.Wrapper<Flow>> oldFlows =
428 new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
429 Set<Equivalence.Wrapper<Flow>> updatedFlows =
430 new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
432 // what is still there but was not updated, needs to be deleted
433 Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
434 Sets.difference(oldFlows, updatedFlows);
435 // new flows (they were not there before)
436 Sets.SetView<Equivalence.Wrapper<Flow>> additions =
437 Sets.difference(updatedFlows, oldFlows);
439 if (!deletions.isEmpty()) {
440 for (Equivalence.Wrapper<Flow> wf: deletions) {
443 t.delete(LogicalDatastoreType.CONFIGURATION,
444 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
448 if (!additions.isEmpty()) {
449 for (Equivalence.Wrapper<Flow> wf: additions) {
452 t.put(LogicalDatastoreType.CONFIGURATION,
453 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
457 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
458 Futures.addCallback(f, new FutureCallback<Void>() {
460 public void onFailure(Throwable t) {
461 LOG.error("Could not write flow table {}", t);
465 public void onSuccess(Void result) {
466 LOG.debug("Flow table updated.");
473 private void scheduleUpdate() {
474 if (switchManager != null) {
475 LOG.trace("Scheduling flow update task");
476 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
481 * Update the flows on a particular switch
483 private class SwitchFlowUpdateTask implements Callable<Void> {
484 private FlowMap flowMap;
486 public SwitchFlowUpdateTask(FlowMap flowMap) {
488 this.flowMap = flowMap;
492 public Void call() throws Exception {
493 for (NodeId node : switchManager.getReadySwitches()) {
494 PolicyInfo info = policyResolver.getCurrentPolicy();
497 for (OfTable table : flowPipeline) {
499 table.update(node, info, flowMap);
500 } catch (Exception e) {
501 LOG.error("Failed to write flow table {}",
502 table.getClass().getSimpleName(), e);
511 * Update all flows on all switches as needed. Note that this will block
512 * one of the threads on the executor.
514 private class FlowUpdateTask implements Runnable {
517 LOG.debug("Beginning flow update task");
519 CompletionService<Void> ecs
520 = new ExecutorCompletionService<>(executor);
523 FlowMap flowMap = new FlowMap();
525 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
529 for (int i = 0; i < n; i++) {
532 flowMap.commitToDataStore();
533 } catch (InterruptedException | ExecutionException e) {
534 LOG.error("Failed to update flow tables", e);
537 LOG.debug("Flow update completed");