2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.io.Closeable;
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.List;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorCompletionService;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.dto.EgKey;
32 import org.opendaylight.groupbasedpolicy.dto.EpKey;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
46 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
47 import org.opendaylight.groupbasedpolicy.util.IidFactory;
48 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
57 import org.opendaylight.yangtools.concepts.ListenerRegistration;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 import com.google.common.base.Function;
63 import com.google.common.base.Optional;
64 import com.google.common.collect.ImmutableList;
65 import com.google.common.util.concurrent.AsyncFunction;
66 import com.google.common.util.concurrent.Futures;
67 import com.google.common.util.concurrent.ListenableFuture;
70 * Manage policies on switches by subscribing to updates from the
71 * policy resolver and information about endpoints from the endpoint
74 public class PolicyManager
75 implements SwitchListener, EndpointListener, DataTreeChangeListener<ResolvedPolicy>, Closeable {
76 private static final Logger LOG =
77 LoggerFactory.getLogger(PolicyManager.class);
79 private short tableOffset;
80 private static final short TABLEID_PORTSECURITY = 0;
81 private static final short TABLEID_INGRESS_NAT = 1;
82 private static final short TABLEID_SOURCE_MAPPER = 2;
83 private static final short TABLEID_DESTINATION_MAPPER = 3;
84 private static final short TABLEID_POLICY_ENFORCER = 4;
85 private static final short TABLEID_EGRESS_NAT = 5;
86 private static final short TABLEID_EXTERNAL_MAPPER = 6;
88 private final SwitchManager switchManager;
89 private final EndpointManager endpointManager;
91 private final ListenerRegistration<PolicyManager> registerDataTreeChangeListener;
93 private final ScheduledExecutorService executor;
94 private final SingletonTask flowUpdateTask;
95 private final DataBroker dataBroker;
98 * The delay before triggering the flow update task in response to an
99 * event in milliseconds.
101 private final static int FLOW_UPDATE_DELAY = 250;
103 public PolicyManager(DataBroker dataBroker,
104 SwitchManager switchManager,
105 EndpointManager endpointManager,
106 RpcProviderRegistry rpcRegistry,
107 ScheduledExecutorService executor,
110 this.switchManager = switchManager;
111 this.executor = executor;
112 this.dataBroker = dataBroker;
113 this.tableOffset = tableOffset;
115 // to validate against model
116 verifyMaxTableId(tableOffset);
117 } catch (IllegalArgumentException e) {
118 throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
119 + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
122 if (dataBroker != null) {
123 registerDataTreeChangeListener = dataBroker.registerDataTreeChangeListener(
124 new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
125 InstanceIdentifier.builder(ResolvedPolicies.class).child(ResolvedPolicy.class).build()),
128 registerDataTreeChangeListener = null;
129 LOG.error("DataBroker is null. Listener for {} was not registered.",
130 ResolvedPolicy.class.getCanonicalName());
132 if (switchManager != null)
133 switchManager.registerListener(this);
134 this.endpointManager = endpointManager;
135 endpointManager.registerListener(this);
137 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
140 LOG.debug("Initialized OFOverlay policy manager");
143 private List<? extends OfTable> createFlowPipeline(OfContext ofCtx) {
144 // TODO - PORTSECURITY is kept in table 0.
145 // According to openflow spec,processing on vSwitch always starts from table 0.
146 // Packets will be droped if table 0 is empty.
147 // Alternative workaround - table-miss flow entries in table 0.
148 return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
149 new GroupTable(ofCtx),
150 new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
151 new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
152 new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
153 new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
154 new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
155 new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
160 * @param tableOffset the new offset value
161 * @return {@link ListenableFuture} to indicate that tables have been synced
163 public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
165 verifyMaxTableId(tableOffset);
166 } catch (IllegalArgumentException e) {
167 LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
168 // TODO - invalid offset value remains in conf DS
169 // It's not possible to validate offset value by using constrains in model,
170 // because number of tables in pipeline varies.
171 return Futures.immediateFuture(null);
173 List<Short> tableIDs = getTableIDs();
174 this.tableOffset = tableOffset;
175 return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
178 public Void apply(Void tablesRemoved) {
186 * @param tableIDs - IDs of tables to delete
187 * @return ListenableFuture<Void> - which will be filled when clearing is done
189 private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
190 List<ListenableFuture<Void>> checkList = new ArrayList<>();
191 final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
192 for (Short tableId : tableIDs) {
193 for (NodeId nodeId : switchManager.getReadySwitches()) {
194 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
195 checkList.add(deteleTableIfExists(rwTx, tablePath));
198 ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
199 return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
202 public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
203 return rwTx.submit();
208 private List<Short> getTableIDs() {
209 List<Short> tableIds = new ArrayList<>();
210 tableIds.add(getTABLEID_PORTSECURITY());
211 tableIds.add(getTABLEID_INGRESS_NAT());
212 tableIds.add(getTABLEID_SOURCE_MAPPER());
213 tableIds.add(getTABLEID_DESTINATION_MAPPER());
214 tableIds.add(getTABLEID_POLICY_ENFORCER());
215 tableIds.add(getTABLEID_EGRESS_NAT());
216 tableIds.add(getTABLEID_EXTERNAL_MAPPER());
220 private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
221 return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
224 public Void apply(Optional<Table> optTable) {
225 if(optTable.isPresent()){
226 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
236 public short getTABLEID_PORTSECURITY() {
237 return (short)(tableOffset+TABLEID_PORTSECURITY);
241 public short getTABLEID_INGRESS_NAT() {
242 return (short)(tableOffset+TABLEID_INGRESS_NAT);
246 public short getTABLEID_SOURCE_MAPPER() {
247 return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
251 public short getTABLEID_DESTINATION_MAPPER() {
252 return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
256 public short getTABLEID_POLICY_ENFORCER() {
257 return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
261 public short getTABLEID_EGRESS_NAT() {
262 return (short)(tableOffset+TABLEID_EGRESS_NAT);
266 public short getTABLEID_EXTERNAL_MAPPER() {
267 return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
271 public TableId verifyMaxTableId(short tableOffset) {
272 return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
276 public void switchReady(final NodeId nodeId) {
281 public void switchRemoved(NodeId sw) {
282 // XXX TODO purge switch flows
287 public void switchUpdated(NodeId sw) {
296 public void endpointUpdated(EpKey epKey) {
301 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
306 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
307 // TODO a renderer should remove followed-EPG and followed-tenant at some point
308 if (dataBroker == null) {
309 LOG.error("DataBroker is null. Cannot write followed-epg {}", epKey);
312 WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
313 FollowedEndpointGroup followedEpg = new FollowedEndpointGroupBuilder().setId(egKey.getEgId()).build();
314 wTx.put(LogicalDatastoreType.OPERATIONAL, IidFactory.followedEndpointgroupIid(OFOverlayRenderer.RENDERER_NAME,
315 egKey.getTenantId(), egKey.getEgId()), followedEpg, true);
316 DataStoreHelper.submitToDs(wTx);
321 // DataTreeChangeListener<ResolvedPolicy>
325 public void onDataTreeChanged(Collection<DataTreeModification<ResolvedPolicy>> changes) {
334 * Set the learning mode to the specified value
335 * @param learningMode the learning mode to set
337 public void setLearningMode(LearningMode learningMode) {
345 private void scheduleUpdate() {
346 if (switchManager != null) {
347 LOG.trace("Scheduling flow update task");
348 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
353 * Update the flows on a particular switch
355 private class SwitchFlowUpdateTask implements Callable<Void> {
356 private final OfWriter ofWriter;
358 public SwitchFlowUpdateTask(OfWriter ofWriter) {
359 this.ofWriter = ofWriter;
363 public Void call() throws Exception {
364 OfContext ofCtx = new OfContext(dataBroker, PolicyManager.this, switchManager, endpointManager, executor);
365 if (ofCtx.getCurrentPolicy() == null)
367 List<? extends OfTable> flowPipeline = createFlowPipeline(ofCtx);
368 for (NodeId node : switchManager.getReadySwitches()) {
369 for (OfTable table : flowPipeline) {
371 table.sync(node, ofWriter);
372 } catch (Exception e) {
373 LOG.error("Failed to write Openflow table {}",
374 table.getClass().getSimpleName(), e);
383 * Update all flows on all switches as needed. Note that this will block
384 * one of the threads on the executor.
386 private class FlowUpdateTask implements Runnable {
389 LOG.debug("Beginning flow update task");
391 CompletionService<Void> ecs
392 = new ExecutorCompletionService<>(executor);
395 OfWriter ofWriter = new OfWriter();
397 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
401 for (int i = 0; i < n; i++) {
404 ofWriter.commitToDataStore(dataBroker);
405 } catch (InterruptedException | ExecutionException e) {
406 LOG.error("Failed to update flow tables", e);
409 LOG.debug("Flow update completed");
414 public void close() throws IOException {
415 if (registerDataTreeChangeListener != null)
416 registerDataTreeChangeListener.close();
417 // TODO unregister classifier and action instance validators