2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Verify;
12 import com.google.common.collect.Iterators;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.TimerTask;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
39 import org.opendaylight.openflowplugin.api.ConnectionException;
40 import org.opendaylight.openflowplugin.api.OFConstants;
41 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
44 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
45 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
46 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
47 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
48 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
49 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
50 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
52 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
53 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
54 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
55 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
56 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
58 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
59 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
60 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
61 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
62 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
63 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
64 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
65 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
66 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
67 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
68 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
69 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
70 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
71 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
72 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
73 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
74 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
75 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
76 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
77 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
78 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
79 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
80 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
81 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
82 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
83 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
84 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
118 import org.opendaylight.yangtools.yang.binding.DataContainer;
119 import org.opendaylight.yangtools.yang.binding.DataObject;
120 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
121 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
122 import org.opendaylight.yangtools.yang.common.RpcResult;
123 import org.slf4j.Logger;
124 import org.slf4j.LoggerFactory;
126 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
128 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
130 // TODO: drain factor should be parametrized
131 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
132 // TODO: low water mark factor should be parametrized
133 private static final float LOW_WATERMARK_FACTOR = 0.75f;
134 // TODO: high water mark factor should be parametrized
135 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
137 // Timeout in seconds after what we will give up on propagating role
138 private static final int SET_ROLE_TIMEOUT = 10;
140 // Timeout in milliseconds after what we will give up on initializing device
141 private static final int DEVICE_INIT_TIMEOUT = 9000;
143 private static final int LOW_WATERMARK = 1000;
144 private static final int HIGH_WATERMARK = 2000;
146 private final MultipartWriterProvider writerProvider;
147 private final HashedWheelTimer hashedWheelTimer;
148 private final DeviceState deviceState;
149 private final DataBroker dataBroker;
150 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
151 private final MessageSpy messageSpy;
152 private final ItemLifeCycleKeeper flowLifeCycleKeeper;
153 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
154 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
155 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
156 private final TranslatorLibrary translatorLibrary;
157 private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
158 private final ConvertorExecutor convertorExecutor;
159 private final DeviceInitializerProvider deviceInitializerProvider;
160 private final PacketInRateLimiter packetInLimiter;
161 private final DeviceInfo deviceInfo;
162 private final boolean skipTableFeatures;
163 private final boolean switchFeaturesMandatory;
164 private final boolean isFlowRemovedNotificationOn;
165 private final boolean useSingleLayerSerialization;
166 private NotificationPublishService notificationPublishService;
167 private TransactionChainManager transactionChainManager;
168 private DeviceFlowRegistry deviceFlowRegistry;
169 private DeviceGroupRegistry deviceGroupRegistry;
170 private DeviceMeterRegistry deviceMeterRegistry;
171 private ExtensionConverterProvider extensionConverterProvider;
172 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
173 private SalRoleService salRoleService;
174 private boolean initialized;
175 private boolean hasState;
176 private boolean isInitialTransactionSubmitted;
177 private volatile ConnectionContext primaryConnectionContext;
178 private volatile ContextState state;
181 @Nonnull final ConnectionContext primaryConnectionContext,
182 @Nonnull final DataBroker dataBroker,
183 @Nonnull final MessageSpy messageSpy,
184 @Nonnull final TranslatorLibrary translatorLibrary,
185 final ConvertorExecutor convertorExecutor,
186 final boolean skipTableFeatures,
187 final HashedWheelTimer hashedWheelTimer,
188 final boolean useSingleLayerSerialization,
189 final DeviceInitializerProvider deviceInitializerProvider,
190 final boolean isFlowRemovedNotificationOn,
191 final boolean switchFeaturesMandatory) {
193 this.primaryConnectionContext = primaryConnectionContext;
194 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
195 this.hashedWheelTimer = hashedWheelTimer;
196 this.deviceInitializerProvider = deviceInitializerProvider;
197 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
198 this.switchFeaturesMandatory = switchFeaturesMandatory;
199 this.deviceState = new DeviceStateImpl();
200 this.dataBroker = dataBroker;
201 this.messageSpy = messageSpy;
203 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
204 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
206 this.translatorLibrary = translatorLibrary;
207 this.portStatusTranslator = translatorLibrary.lookupTranslator(
208 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
209 this.packetInTranslator = translatorLibrary.lookupTranslator(
210 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
211 .PacketIn.class.getName()));
212 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
213 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
215 this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
216 this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
217 this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
218 this.state = ContextState.INITIALIZATION;
219 this.convertorExecutor = convertorExecutor;
220 this.skipTableFeatures = skipTableFeatures;
221 this.useSingleLayerSerialization = useSingleLayerSerialization;
222 this.initialized = false;
223 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
227 public boolean initialSubmitTransaction() {
228 return (initialized &&(isInitialTransactionSubmitted =
229 transactionChainManager.initialSubmitWriteTransaction()));
233 public DeviceState getDeviceState() {
238 public ReadOnlyTransaction getReadTransaction() {
239 return dataBroker.newReadOnlyTransaction();
243 public boolean isTransactionsEnabled() {
244 return isInitialTransactionSubmitted;
248 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
249 final InstanceIdentifier<T> path,
252 transactionChainManager.writeToTransaction(store, path, data, false);
257 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
258 final InstanceIdentifier<T> path,
261 transactionChainManager.writeToTransaction(store, path, data, true);
266 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
268 transactionChainManager.addDeleteOperationTotTxChain(store, path);
273 public boolean submitTransaction() {
274 return initialized && transactionChainManager.submitWriteTransaction();
278 public ConnectionContext getPrimaryConnectionContext() {
279 return primaryConnectionContext;
283 public DeviceFlowRegistry getDeviceFlowRegistry() {
284 return deviceFlowRegistry;
288 public DeviceGroupRegistry getDeviceGroupRegistry() {
289 return deviceGroupRegistry;
293 public DeviceMeterRegistry getDeviceMeterRegistry() {
294 return deviceMeterRegistry;
298 public void processReply(final OfHeader ofHeader) {
299 messageSpy.spyMessage(
300 ofHeader.getImplementedInterface(),
301 (ofHeader instanceof Error)
302 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
307 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
308 ofHeaderList.forEach(header -> messageSpy.spyMessage(
309 header.getImplementedInterface(),
310 (header instanceof Error)
311 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
312 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
316 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
317 //1. translate to general flow (table, priority, match, cookie)
318 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
319 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
321 if (isFlowRemovedNotificationOn) {
322 // Trigger off a notification
323 notificationPublishService.offerNotification(flowRemovedNotification);
326 final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
327 if (itemLifecycleListener != null) {
328 //2. create registry key
329 final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
331 final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
332 //4. if flowId present:
333 if (flowDescriptor != null) {
334 // a) construct flow path
335 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
336 .augmentation(FlowCapableNode.class)
337 .child(Table.class, flowDescriptor.getTableKey())
338 .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
339 // b) notify listener
340 itemLifecycleListener.onRemoved(flowPath);
342 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
343 getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
349 public void processPortStatusMessage(final PortStatusMessage portStatus) {
350 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
354 writePortStatusMessage(portStatus);
356 } catch (final Exception e) {
357 LOG.warn("Error processing port status message for port {} on device {}",
358 portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
360 } else if (!hasState) {
361 primaryConnectionContext.handlePortStatusMessage(portStatus);
365 private void writePortStatusMessage(final PortStatus portStatusMessage) {
366 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
367 .translate(portStatusMessage, getDeviceInfo(), null);
369 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
370 .getNodeInstanceIdentifier()
371 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
372 .nodeConnectorIdfromDatapathPortNo(
373 deviceInfo.getDatapathId(),
374 portStatusMessage.getPortNo(),
375 OpenflowVersion.get(deviceInfo.getVersion()))));
377 if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
378 // because of ADD status node connector has to be created
379 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
380 .setKey(iiToNodeConnector.getKey())
381 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build())
382 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
384 } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
385 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
390 public void processPacketInMessage(final PacketInMessage packetInMessage) {
391 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
392 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
395 private void handlePacketInMessage(final PacketIn packetIn,
396 final Class<?> implementedInterface,
398 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
399 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
401 if (packetIn == null) {
402 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
403 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
407 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
409 // Try to get ingress from match
410 final Optional<NodeConnectorRef> nodeConnectorRef = Optional.ofNullable(match)
411 .flatMap(m -> Optional.ofNullable(m.getInPort()))
412 .flatMap(nodeConnectorId -> Optional.ofNullable(InventoryDataServiceUtil
413 .portNumberfromNodeConnectorId(
416 .map(portNumber -> InventoryDataServiceUtil
417 .nodeConnectorRefFromDatapathIdPortno(
418 deviceInfo.getDatapathId(),
422 if (!nodeConnectorRef.isPresent()) {
423 LOG.debug("Received packet from switch {} but couldn't find an input port", connectionAdapter.getRemoteAddress());
424 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
428 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
430 if (!packetInLimiter.acquirePermit()) {
431 LOG.debug("Packet limited");
432 // TODO: save packet into emergency slot if possible
433 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
437 final ListenableFuture<?> offerNotification = notificationPublishService
438 .offerNotification(new PacketReceivedBuilder(packetIn)
439 .setIngress(nodeConnectorRef.get())
440 .setMatch(MatchUtil.transformMatch(match,
441 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
445 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
446 LOG.debug("notification offer rejected");
447 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
448 packetInLimiter.drainLowWaterMark();
449 packetInLimiter.releasePermit();
453 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
455 public void onSuccess(final Object result) {
456 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
457 packetInLimiter.releasePermit();
461 public void onFailure(final Throwable t) {
462 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
463 LOG.debug("notification offer failed: {}", t.getMessage());
464 LOG.trace("notification offer failed..", t);
465 packetInLimiter.releasePermit();
471 public void processExperimenterMessage(final ExperimenterMessage notification) {
473 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
474 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
475 getDeviceInfo().getVersion(),
476 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
477 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
478 if (messageConverter == null) {
479 LOG.warn("custom converter for {}[OF:{}] not found",
480 notification.getExperimenterDataOfChoice().getImplementedInterface(),
481 getDeviceInfo().getVersion());
484 // build notification
485 final ExperimenterMessageOfChoice messageOfChoice;
487 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
488 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
489 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
490 .setExperimenterMessageOfChoice(messageOfChoice);
492 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
493 } catch (final ConversionException e) {
494 LOG.error("Conversion of experimenter notification failed", e);
499 public void processAlienMessage(final OfHeader message) {
500 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
502 if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
503 .PacketInMessage.class.equals(implementedInterface)) {
504 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
505 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
506 .PacketInMessage.class.cast(message);
507 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
512 public TranslatorLibrary oook() {
513 return translatorLibrary;
517 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
518 this.notificationPublishService = notificationPublishService;
522 public MessageSpy getMessageSpy() {
527 public void onPublished() {
528 Verify.verify(ContextState.INITIALIZATION.equals(state));
529 this.state = ContextState.WORKING;
530 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
534 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>> requestContext) {
535 return new MultiMsgCollectorImpl<>(this, requestContext);
539 public void updatePacketInRateLimit(final long upperBound) {
540 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
544 public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
545 return itemLifeCycleSourceRegistry;
549 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
550 this.extensionConverterProvider = extensionConverterProvider;
554 public ExtensionConverterProvider getExtensionConverterProvider() {
555 return extensionConverterProvider;
559 TransactionChainManager getTransactionChainManager() {
560 return this.transactionChainManager;
564 public ListenableFuture<Void> stopClusterServices() {
566 ? transactionChainManager.deactivateTransactionManager()
567 : Futures.immediateFuture(null);
571 public ServiceGroupIdentifier getServiceIdentifier() {
572 return this.deviceInfo.getServiceIdentifier();
576 public DeviceInfo getDeviceInfo() {
577 return this.deviceInfo;
581 public void close() {
582 if (ContextState.TERMINATION.equals(state)) {
583 if (LOG.isDebugEnabled()) {
584 LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
590 state = ContextState.TERMINATION;
592 // Close all datastore registries and transactions
595 deviceGroupRegistry.close();
596 deviceFlowRegistry.close();
597 deviceMeterRegistry.close();
599 Futures.addCallback(transactionChainManager.shuttingDown(), new FutureCallback<Void>() {
601 public void onSuccess(@Nullable final Void result) {
602 transactionChainManager.close();
603 transactionChainManager = null;
607 public void onFailure(final Throwable t) {
608 transactionChainManager.close();
609 transactionChainManager = null;
614 for (final Iterator<RequestContext<?>> iterator = Iterators
615 .consumingIterator(requestContexts.iterator()); iterator.hasNext();) {
616 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), "Connection closed.");
621 public boolean canUseSingleLayerSerialization() {
622 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
626 public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
627 this.salRoleService = salRoleService;
631 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
632 this.clusterInitializationPhaseHandler = handler;
636 public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
637 LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
638 lazyTransactionManagerInitialization();
641 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
642 .retrieveAndClearPortStatusMessages();
644 portStatusMessages.forEach(this::writePortStatusMessage);
646 } catch (final Exception ex) {
647 LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex);
652 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
653 .lookup(deviceInfo.getVersion());
655 if (initializer.isPresent()) {
658 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor)
659 .get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
661 throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion()));
663 } catch (ExecutionException | InterruptedException | TimeoutException ex) {
664 LOG.warn("Device {} cannot be initialized: {}", deviceInfo.getLOGValue(), ex.getMessage());
665 LOG.trace("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), ex);
669 Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
670 new RpcResultFutureCallback(mastershipChangeListener));
672 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
673 Futures.addCallback(deviceFlowRegistryFill,
674 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, mastershipChangeListener));
676 return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
680 void lazyTransactionManagerInitialization() {
681 if (!this.initialized) {
682 if (LOG.isDebugEnabled()) {
683 LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
685 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
686 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
687 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
688 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
689 this.transactionChainManager.activateTransactionManager();
690 this.initialized = true;
696 public <T> RequestContext<T> createRequestContext() {
697 final Long xid = deviceInfo.reserveXidForDeviceMessage();
699 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
701 public void close() {
702 requestContexts.remove(this);
706 requestContexts.add(abstractRequestContext);
707 return abstractRequestContext;
710 private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
711 if (LOG.isDebugEnabled()) {
712 LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
715 final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
717 if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
718 final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
719 .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
721 setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
723 final TimerTask timerTask = timeout -> {
724 if (!setRoleOutputFuture.isDone()) {
725 LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
726 setRoleOutputFuture.cancel(true);
730 hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
732 LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
733 return Futures.immediateFuture(null);
736 return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
740 public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
741 return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
745 public void onStateAcquired(final ContextChainState state) {
749 private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
751 private final MastershipChangeListener mastershipChangeListener;
753 RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
754 this.mastershipChangeListener = mastershipChangeListener;
758 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
759 this.mastershipChangeListener.onMasterRoleAcquired(
761 ContextChainMastershipState.MASTER_ON_DEVICE
763 if (LOG.isDebugEnabled()) {
764 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
769 public void onFailure(final Throwable throwable) {
770 mastershipChangeListener.onNotAbleToStartMastershipMandatory(
772 "Was not able to set MASTER role on device");
776 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base.Optional<FlowCapableNode>>> {
777 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
778 private final MastershipChangeListener mastershipChangeListener;
780 DeviceFlowRegistryCallback(
781 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
782 MastershipChangeListener mastershipChangeListener) {
783 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
784 this.mastershipChangeListener = mastershipChangeListener;
788 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
789 if (LOG.isDebugEnabled()) {
790 // Count all flows we read from datastore for debugging purposes.
791 // This number do not always represent how many flows were actually added
792 // to DeviceFlowRegistry, because of possible duplicates.
793 long flowCount = Optional.ofNullable(result)
794 .map(Collections::singleton)
795 .orElse(Collections.emptySet())
797 .flatMap(Collection::stream)
798 .filter(Objects::nonNull)
799 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
800 .filter(Objects::nonNull)
801 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
802 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
803 .filter(Objects::nonNull)
804 .filter(table -> Objects.nonNull(table.getFlow()))
805 .flatMap(table -> table.getFlow().stream())
806 .filter(Objects::nonNull)
809 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
811 this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
815 public void onFailure(Throwable t) {
816 if (deviceFlowRegistryFill.isCancelled()) {
817 if (LOG.isDebugEnabled()) {
818 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
821 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
823 mastershipChangeListener.onNotAbleToStartMastership(
825 "Was not able to fill flow registry on device",