2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicReference;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
44 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
45 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
57 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
58 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
59 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
60 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
61 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
62 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
63 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
64 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
65 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
66 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
67 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
68 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
69 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
70 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
71 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
72 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
73 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
74 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
75 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
76 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
77 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
78 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
79 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
80 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
82 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
116 import org.opendaylight.yangtools.yang.binding.DataContainer;
117 import org.opendaylight.yangtools.yang.binding.DataObject;
118 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
119 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
120 import org.opendaylight.yangtools.yang.common.RpcResult;
121 import org.slf4j.Logger;
122 import org.slf4j.LoggerFactory;
124 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
126 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
128 // TODO: drain factor should be parametrized
129 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
130 // TODO: low water mark factor should be parametrized
131 private static final float LOW_WATERMARK_FACTOR = 0.75f;
132 // TODO: high water mark factor should be parametrized
133 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
135 // Timeout in seconds after what we will give up on propagating role
136 private static final int SET_ROLE_TIMEOUT = 10;
138 // Timeout in milliseconds after what we will give up on initializing device
139 private static final int DEVICE_INIT_TIMEOUT = 9000;
141 // Timeout in milliseconds after what we will give up on closing transaction chain
142 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
144 private static final int LOW_WATERMARK = 1000;
145 private static final int HIGH_WATERMARK = 2000;
147 private final MultipartWriterProvider writerProvider;
148 private final HashedWheelTimer hashedWheelTimer;
149 private final DeviceState deviceState;
150 private final DataBroker dataBroker;
151 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
152 private final MessageSpy messageSpy;
153 private final ItemLifeCycleKeeper flowLifeCycleKeeper;
154 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
155 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
156 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
157 private final TranslatorLibrary translatorLibrary;
158 private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
159 private final ConvertorExecutor convertorExecutor;
160 private final DeviceInitializerProvider deviceInitializerProvider;
161 private final PacketInRateLimiter packetInLimiter;
162 private final DeviceInfo deviceInfo;
163 private final ConnectionContext primaryConnectionContext;
164 private final boolean skipTableFeatures;
165 private final boolean switchFeaturesMandatory;
166 private final boolean isFlowRemovedNotificationOn;
167 private final boolean useSingleLayerSerialization;
168 private final AtomicBoolean initialized = new AtomicBoolean(false);
169 private final AtomicBoolean hasState = new AtomicBoolean(false);
170 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
171 private final AtomicReference<ContextState> state = new AtomicReference<>(ContextState.INITIALIZATION);
172 private NotificationPublishService notificationPublishService;
173 private TransactionChainManager transactionChainManager;
174 private DeviceFlowRegistry deviceFlowRegistry;
175 private DeviceGroupRegistry deviceGroupRegistry;
176 private DeviceMeterRegistry deviceMeterRegistry;
177 private ExtensionConverterProvider extensionConverterProvider;
178 private SalRoleService salRoleService;
179 private ContextChainMastershipWatcher contextChainMastershipWatcher;
182 @Nonnull final ConnectionContext primaryConnectionContext,
183 @Nonnull final DataBroker dataBroker,
184 @Nonnull final MessageSpy messageSpy,
185 @Nonnull final TranslatorLibrary translatorLibrary,
186 final ConvertorExecutor convertorExecutor,
187 final boolean skipTableFeatures,
188 final HashedWheelTimer hashedWheelTimer,
189 final boolean useSingleLayerSerialization,
190 final DeviceInitializerProvider deviceInitializerProvider,
191 final boolean isFlowRemovedNotificationOn,
192 final boolean switchFeaturesMandatory) {
194 this.primaryConnectionContext = primaryConnectionContext;
195 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
196 this.hashedWheelTimer = hashedWheelTimer;
197 this.deviceInitializerProvider = deviceInitializerProvider;
198 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
199 this.switchFeaturesMandatory = switchFeaturesMandatory;
200 this.deviceState = new DeviceStateImpl();
201 this.dataBroker = dataBroker;
202 this.messageSpy = messageSpy;
204 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
205 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
207 this.translatorLibrary = translatorLibrary;
208 this.portStatusTranslator = translatorLibrary.lookupTranslator(
209 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
210 this.packetInTranslator = translatorLibrary.lookupTranslator(
211 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
212 .PacketIn.class.getName()));
213 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
214 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
216 this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
217 this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
218 this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
219 this.convertorExecutor = convertorExecutor;
220 this.skipTableFeatures = skipTableFeatures;
221 this.useSingleLayerSerialization = useSingleLayerSerialization;
222 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
226 public boolean initialSubmitTransaction() {
227 if (!initialized.get()) {
231 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
232 isInitialTransactionSubmitted.set(initialSubmit);
233 return initialSubmit;
237 public DeviceState getDeviceState() {
242 public ReadOnlyTransaction getReadTransaction() {
243 return dataBroker.newReadOnlyTransaction();
247 public boolean isTransactionsEnabled() {
248 return isInitialTransactionSubmitted.get();
252 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
253 final InstanceIdentifier<T> path,
255 if (initialized.get()) {
256 transactionChainManager.writeToTransaction(store, path, data, false);
261 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
262 final InstanceIdentifier<T> path,
264 if (initialized.get()) {
265 transactionChainManager.writeToTransaction(store, path, data, true);
270 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
271 if (initialized.get()) {
272 transactionChainManager.addDeleteOperationTotTxChain(store, path);
277 public boolean submitTransaction() {
278 return initialized.get() && transactionChainManager.submitWriteTransaction();
282 public ConnectionContext getPrimaryConnectionContext() {
283 return primaryConnectionContext;
287 public DeviceFlowRegistry getDeviceFlowRegistry() {
288 return deviceFlowRegistry;
292 public DeviceGroupRegistry getDeviceGroupRegistry() {
293 return deviceGroupRegistry;
297 public DeviceMeterRegistry getDeviceMeterRegistry() {
298 return deviceMeterRegistry;
302 public void processReply(final OfHeader ofHeader) {
303 messageSpy.spyMessage(
304 ofHeader.getImplementedInterface(),
305 (ofHeader instanceof Error)
306 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
307 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
311 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
312 ofHeaderList.forEach(header -> messageSpy.spyMessage(
313 header.getImplementedInterface(),
314 (header instanceof Error)
315 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
316 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
320 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
321 //1. translate to general flow (table, priority, match, cookie)
322 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
323 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
325 if (isFlowRemovedNotificationOn) {
326 // Trigger off a notification
327 notificationPublishService.offerNotification(flowRemovedNotification);
330 final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
331 if (itemLifecycleListener != null) {
332 //2. create registry key
333 final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
335 final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
336 //4. if flowId present:
337 if (flowDescriptor != null) {
338 // a) construct flow path
339 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
340 .augmentation(FlowCapableNode.class)
341 .child(Table.class, flowDescriptor.getTableKey())
342 .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
343 // b) notify listener
344 itemLifecycleListener.onRemoved(flowPath);
346 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
347 getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
353 public void processPortStatusMessage(final PortStatusMessage portStatus) {
354 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
356 if (initialized.get()) {
358 writePortStatusMessage(portStatus);
360 } catch (final Exception e) {
361 LOG.warn("Error processing port status message for port {} on device {}",
362 portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
364 } else if (!hasState.get()) {
365 primaryConnectionContext.handlePortStatusMessage(portStatus);
369 private void writePortStatusMessage(final PortStatus portStatusMessage) {
370 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
371 .translate(portStatusMessage, getDeviceInfo(), null);
373 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
374 .getNodeInstanceIdentifier()
375 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
376 .nodeConnectorIdfromDatapathPortNo(
377 deviceInfo.getDatapathId(),
378 portStatusMessage.getPortNo(),
379 OpenflowVersion.get(deviceInfo.getVersion()))));
381 if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
382 // because of ADD status node connector has to be created
383 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
384 .setKey(iiToNodeConnector.getKey())
385 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build())
386 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
388 } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
389 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
394 public void processPacketInMessage(final PacketInMessage packetInMessage) {
395 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
396 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
399 private void handlePacketInMessage(final PacketIn packetIn,
400 final Class<?> implementedInterface,
402 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
403 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
405 if (packetIn == null) {
406 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
407 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
411 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
413 // Try to get ingress from match
414 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
415 ? packetIn.getIngress() : Optional.ofNullable(match)
416 .map(Match::getInPort)
417 .map(nodeConnectorId -> InventoryDataServiceUtil
418 .portNumberfromNodeConnectorId(
421 .map(portNumber -> InventoryDataServiceUtil
422 .nodeConnectorRefFromDatapathIdPortno(
423 deviceInfo.getDatapathId(),
428 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
430 if (!packetInLimiter.acquirePermit()) {
431 LOG.debug("Packet limited");
432 // TODO: save packet into emergency slot if possible
433 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
437 final ListenableFuture<?> offerNotification = notificationPublishService
438 .offerNotification(new PacketReceivedBuilder(packetIn)
439 .setIngress(nodeConnectorRef)
440 .setMatch(MatchUtil.transformMatch(match,
441 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
445 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
446 LOG.debug("notification offer rejected");
447 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
448 packetInLimiter.drainLowWaterMark();
449 packetInLimiter.releasePermit();
453 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
455 public void onSuccess(final Object result) {
456 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
457 packetInLimiter.releasePermit();
461 public void onFailure(final Throwable t) {
462 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
463 LOG.debug("notification offer failed: {}", t.getMessage());
464 LOG.trace("notification offer failed..", t);
465 packetInLimiter.releasePermit();
471 public void processExperimenterMessage(final ExperimenterMessage notification) {
473 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
474 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
475 getDeviceInfo().getVersion(),
476 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
477 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
478 if (messageConverter == null) {
479 LOG.warn("custom converter for {}[OF:{}] not found",
480 notification.getExperimenterDataOfChoice().getImplementedInterface(),
481 getDeviceInfo().getVersion());
484 // build notification
485 final ExperimenterMessageOfChoice messageOfChoice;
487 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
488 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
489 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
490 .setExperimenterMessageOfChoice(messageOfChoice);
492 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
493 } catch (final ConversionException e) {
494 LOG.error("Conversion of experimenter notification failed", e);
499 public boolean processAlienMessage(final OfHeader message) {
500 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
502 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
504 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
505 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
506 .rev130709.PacketInMessage.class.cast(message);
508 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
516 public TranslatorLibrary oook() {
517 return translatorLibrary;
521 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
522 this.notificationPublishService = notificationPublishService;
526 public MessageSpy getMessageSpy() {
531 public void onPublished() {
532 Verify.verify(ContextState.INITIALIZATION.equals(state.get()));
533 state.set(ContextState.WORKING);
534 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
538 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>> requestContext) {
539 return new MultiMsgCollectorImpl<>(this, requestContext);
543 public void updatePacketInRateLimit(final long upperBound) {
544 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
548 public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
549 return itemLifeCycleSourceRegistry;
553 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
554 this.extensionConverterProvider = extensionConverterProvider;
558 public ExtensionConverterProvider getExtensionConverterProvider() {
559 return extensionConverterProvider;
563 TransactionChainManager getTransactionChainManager() {
564 return this.transactionChainManager;
568 public ListenableFuture<Void> closeServiceInstance() {
569 LOG.info("Stopping device context cluster services for node {}", deviceInfo.getLOGValue());
571 final ListenableFuture<Void> listenableFuture = initialized.get()
572 ? transactionChainManager.deactivateTransactionManager()
573 : Futures.immediateFuture(null);
575 hashedWheelTimer.newTimeout((t) -> {
576 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
577 listenableFuture.cancel(true);
579 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
581 return listenableFuture;
585 public DeviceInfo getDeviceInfo() {
586 return this.deviceInfo;
590 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
591 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
596 public ServiceGroupIdentifier getIdentifier() {
597 return deviceInfo.getServiceIdentifier();
601 public void close() {
602 if (ContextState.TERMINATION.equals(state)) {
603 if (LOG.isDebugEnabled()) {
604 LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo());
610 state.set(ContextState.TERMINATION);
612 // Close all datastore registries and transactions
613 if (initialized.get()) {
614 initialized.set(false);
615 deviceGroupRegistry.close();
616 deviceFlowRegistry.close();
617 deviceMeterRegistry.close();
619 final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
622 txChainShuttingDown.get(TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
623 } catch (InterruptedException | ExecutionException | TimeoutException e) {
624 txChainShuttingDown.cancel(true);
625 LOG.warn("Failed to shut down transaction chain for device {}: {}", deviceInfo, e);
628 transactionChainManager.close();
629 transactionChainManager = null;
632 requestContexts.forEach(requestContext -> RequestContextUtil
633 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
634 requestContexts.clear();
638 public boolean canUseSingleLayerSerialization() {
639 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
643 public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
644 this.salRoleService = salRoleService;
648 public void instantiateServiceInstance() {
649 LOG.info("Starting device context cluster services for node {}", deviceInfo);
650 lazyTransactionManagerInitialization();
653 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
654 .retrieveAndClearPortStatusMessages();
656 portStatusMessages.forEach(this::writePortStatusMessage);
658 } catch (final Exception ex) {
659 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
660 deviceInfo.toString(),
664 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
665 .lookup(deviceInfo.getVersion());
667 if (initializer.isPresent()) {
668 final Future<Void> initialize = initializer
670 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
673 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
674 } catch (TimeoutException ex) {
675 initialize.cancel(true);
676 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
677 deviceInfo.toString(),
678 String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
680 } catch (ExecutionException | InterruptedException ex) {
681 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
682 deviceInfo.toString(),
686 throw new RuntimeException(String.format("Unsupported version %s for device %s",
687 deviceInfo.getVersion(),
688 deviceInfo.toString()));
691 Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
692 new RpcResultFutureCallback(contextChainMastershipWatcher));
694 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
695 Futures.addCallback(deviceFlowRegistryFill,
696 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
700 void lazyTransactionManagerInitialization() {
701 if (!this.initialized.get()) {
702 if (LOG.isDebugEnabled()) {
703 LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
705 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
706 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
707 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
708 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
711 transactionChainManager.activateTransactionManager();
712 initialized.set(true);
717 public <T> RequestContext<T> createRequestContext() {
718 final Long xid = deviceInfo.reserveXidForDeviceMessage();
720 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
722 public void close() {
723 requestContexts.remove(this);
727 requestContexts.add(abstractRequestContext);
728 return abstractRequestContext;
731 private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
732 if (LOG.isDebugEnabled()) {
733 LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
736 final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
738 if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
739 final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
740 .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
742 setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
744 final TimerTask timerTask = timeout -> {
745 if (!setRoleOutputFuture.isDone()) {
746 LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
747 setRoleOutputFuture.cancel(true);
751 hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
753 LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
754 return Futures.immediateFuture(null);
757 return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
761 public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
762 return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
766 public void onStateAcquired(final ContextChainState state) {
770 private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
772 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
774 RpcResultFutureCallback(final ContextChainMastershipWatcher contextChainMastershipWatcher) {
775 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
779 public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
780 this.contextChainMastershipWatcher.onMasterRoleAcquired(
782 ContextChainMastershipState.MASTER_ON_DEVICE
784 if (LOG.isDebugEnabled()) {
785 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
790 public void onFailure(final Throwable throwable) {
791 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
793 "Was not able to set MASTER role on device");
797 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base.Optional<FlowCapableNode>>> {
798 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
799 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
801 DeviceFlowRegistryCallback(
802 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
803 ContextChainMastershipWatcher contextChainMastershipWatcher) {
804 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
805 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
809 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
810 if (LOG.isDebugEnabled()) {
811 // Count all flows we read from datastore for debugging purposes.
812 // This number do not always represent how many flows were actually added
813 // to DeviceFlowRegistry, because of possible duplicates.
814 long flowCount = Optional.ofNullable(result)
815 .map(Collections::singleton)
816 .orElse(Collections.emptySet())
818 .flatMap(Collection::stream)
819 .filter(Objects::nonNull)
820 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
821 .filter(Objects::nonNull)
822 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
823 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
824 .filter(Objects::nonNull)
825 .filter(table -> Objects.nonNull(table.getFlow()))
826 .flatMap(table -> table.getFlow().stream())
827 .filter(Objects::nonNull)
830 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
832 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
836 public void onFailure(Throwable t) {
837 if (deviceFlowRegistryFill.isCancelled()) {
838 if (LOG.isDebugEnabled()) {
839 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
842 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
844 contextChainMastershipWatcher.onNotAbleToStartMastership(
846 "Was not able to fill flow registry on device",