2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper, DeviceInitializationContext {
111 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112 private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
114 // TODO: drain factor should be parametrized
115 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
116 // TODO: low water mark factor should be parametrized
117 private static final float LOW_WATERMARK_FACTOR = 0.75f;
118 // TODO: high water mark factor should be parametrized
119 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
121 // Timeout in milliseconds after what we will give up on initializing device
122 private static final int DEVICE_INIT_TIMEOUT = 9000;
124 // Timeout in milliseconds after what we will give up on closing transaction chain
125 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
127 private static final int LOW_WATERMARK = 1000;
128 private static final int HIGH_WATERMARK = 2000;
130 private final MultipartWriterProvider writerProvider;
131 private final HashedWheelTimer hashedWheelTimer;
132 private final DeviceState deviceState;
133 private final DataBroker dataBroker;
134 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
135 private final MessageSpy messageSpy;
136 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
137 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
138 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
139 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
140 private final TranslatorLibrary translatorLibrary;
141 private final ConvertorExecutor convertorExecutor;
142 private final DeviceInitializerProvider deviceInitializerProvider;
143 private final PacketInRateLimiter packetInLimiter;
144 private final DeviceInfo deviceInfo;
145 private final ConnectionContext primaryConnectionContext;
146 private final boolean skipTableFeatures;
147 private final boolean switchFeaturesMandatory;
148 private final boolean isFlowRemovedNotificationOn;
149 private final boolean useSingleLayerSerialization;
150 private final AtomicBoolean initialized = new AtomicBoolean(false);
151 private final AtomicBoolean hasState = new AtomicBoolean(false);
152 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
153 private final ContextChainHolder contextChainHolder;
154 private NotificationPublishService notificationPublishService;
155 private TransactionChainManager transactionChainManager;
156 private DeviceFlowRegistry deviceFlowRegistry;
157 private DeviceGroupRegistry deviceGroupRegistry;
158 private DeviceMeterRegistry deviceMeterRegistry;
159 private ExtensionConverterProvider extensionConverterProvider;
160 private ContextChainMastershipWatcher contextChainMastershipWatcher;
161 private final NotificationManager<String, Runnable> queuedNotificationManager;
162 private final boolean isStatisticsPollingOn;
164 DeviceContextImpl(@NonNull final ConnectionContext primaryConnectionContext,
165 @NonNull final DataBroker dataBroker,
166 @NonNull final MessageSpy messageSpy,
167 @NonNull final TranslatorLibrary translatorLibrary,
168 final ConvertorExecutor convertorExecutor,
169 final boolean skipTableFeatures,
170 final HashedWheelTimer hashedWheelTimer,
171 final boolean useSingleLayerSerialization,
172 final DeviceInitializerProvider deviceInitializerProvider,
173 final boolean isFlowRemovedNotificationOn,
174 final boolean switchFeaturesMandatory,
175 final ContextChainHolder contextChainHolder,
176 final NotificationManager queuedNotificationManager,
177 final boolean isStatisticsPollingOn) {
178 this.primaryConnectionContext = primaryConnectionContext;
179 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
180 this.hashedWheelTimer = hashedWheelTimer;
181 this.deviceInitializerProvider = deviceInitializerProvider;
182 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
183 this.switchFeaturesMandatory = switchFeaturesMandatory;
184 this.deviceState = new DeviceStateImpl();
185 this.dataBroker = dataBroker;
186 this.messageSpy = messageSpy;
187 this.isStatisticsPollingOn = isStatisticsPollingOn;
188 this.contextChainHolder = contextChainHolder;
190 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
191 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
193 this.translatorLibrary = translatorLibrary;
194 this.portStatusTranslator = translatorLibrary.lookupTranslator(
195 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
196 this.packetInTranslator = translatorLibrary.lookupTranslator(
197 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
199 .PacketIn.class.getName()));
200 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
201 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
203 this.convertorExecutor = convertorExecutor;
204 this.skipTableFeatures = skipTableFeatures;
205 this.useSingleLayerSerialization = useSingleLayerSerialization;
206 this.queuedNotificationManager = queuedNotificationManager;
207 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
211 public boolean initialSubmitTransaction() {
212 if (!initialized.get()) {
216 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
217 isInitialTransactionSubmitted.set(initialSubmit);
218 return initialSubmit;
222 public DeviceState getDeviceState() {
227 public ReadTransaction getReadTransaction() {
228 return dataBroker.newReadOnlyTransaction();
232 public boolean isTransactionsEnabled() {
233 return isInitialTransactionSubmitted.get();
237 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
238 final InstanceIdentifier<T> path,
240 if (initialized.get()) {
241 transactionChainManager.writeToTransaction(store, path, data, false);
246 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
247 final InstanceIdentifier<T> path,
249 if (initialized.get()) {
250 transactionChainManager.writeToTransaction(store, path, data, true);
255 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
256 final InstanceIdentifier<T> path) {
257 if (initialized.get()) {
258 transactionChainManager.addDeleteOperationToTxChain(store, path);
263 public boolean submitTransaction() {
264 return initialized.get() && transactionChainManager.submitTransaction();
268 public boolean syncSubmitTransaction() {
269 return initialized.get() && transactionChainManager.submitTransaction(true);
273 public ConnectionContext getPrimaryConnectionContext() {
274 return primaryConnectionContext;
278 public DeviceFlowRegistry getDeviceFlowRegistry() {
279 return deviceFlowRegistry;
283 public DeviceGroupRegistry getDeviceGroupRegistry() {
284 return deviceGroupRegistry;
288 public boolean isStatisticsPollingOn() {
289 return isStatisticsPollingOn;
293 public DeviceMeterRegistry getDeviceMeterRegistry() {
294 return deviceMeterRegistry;
298 public void processReply(final OfHeader ofHeader) {
299 messageSpy.spyMessage(
300 ofHeader.implementedInterface(),
301 ofHeader instanceof Error
302 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
307 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
308 ofHeaderList.forEach(header -> messageSpy.spyMessage(
309 header.implementedInterface(),
310 header instanceof Error
311 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
312 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
316 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
317 if (isMasterOfDevice()) {
318 //1. translate to general flow (table, priority, match, cookie)
319 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
320 .FlowRemoved flowRemovedNotification = flowRemovedTranslator
321 .translate(flowRemoved, deviceInfo, null);
323 if (isFlowRemovedNotificationOn) {
324 // Trigger off a notification
325 notificationPublishService.offerNotification(flowRemovedNotification);
328 LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
329 deviceInfo.getLOGValue());
334 @SuppressWarnings("checkstyle:IllegalCatch")
335 public void processPortStatusMessage(final PortStatusMessage portStatus) {
336 messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
337 .FROM_SWITCH_PUBLISHED_SUCCESS);
339 if (initialized.get()) {
341 writePortStatusMessage(portStatus);
342 } catch (final Exception e) {
343 LOG.warn("Error processing port status message for port {} on device {}",
344 portStatus.getPortNo(), getDeviceInfo(), e);
346 } else if (!hasState.get()) {
347 primaryConnectionContext.handlePortStatusMessage(portStatus);
351 @SuppressWarnings("checkstyle:IllegalCatch")
352 private void writePortStatusMessage(final PortStatus portStatusMessage) {
353 String datapathId = deviceInfo.getDatapathId().toString().intern();
354 queuedNotificationManager.submitNotification(datapathId, () -> {
356 acquireWriteTransactionLock();
357 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
358 .translate(portStatusMessage, getDeviceInfo(), null);
359 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
360 deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
361 portStatusMessage.getReason());
363 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
364 .getNodeInstanceIdentifier()
365 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
366 .nodeConnectorIdfromDatapathPortNo(
367 deviceInfo.getDatapathId(),
368 portStatusMessage.getPortNo(),
369 OpenflowVersion.get(deviceInfo.getVersion()))));
371 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
372 .withKey(iiToNodeConnector.getKey())
373 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
374 FlowCapableNodeConnectorStatisticsDataBuilder().build())
375 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
377 syncSubmitTransaction();
378 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
379 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
380 syncSubmitTransaction();
382 } catch (final Exception e) {
383 LOG.warn("Error processing port status message for port {} on device {}",
384 portStatusMessage.getPortNo(), datapathId, e);
386 releaseWriteTransactionLock();
392 public void processPacketInMessage(final PacketInMessage packetInMessage) {
393 if (isMasterOfDevice()) {
394 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
395 handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
397 LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
401 private Boolean isMasterOfDevice() {
402 final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
403 boolean result = false;
404 if (contextChain != null) {
405 result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
410 private void handlePacketInMessage(final PacketIn packetIn,
411 final Class<?> implementedInterface,
413 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
414 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
416 if (packetIn == null) {
417 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
418 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
422 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
424 // Try to get ingress from match
425 final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
426 ? packetIn.getIngress() : Optional.ofNullable(match)
427 .map(Match::getInPort)
428 .map(nodeConnectorId -> InventoryDataServiceUtil
429 .portNumberfromNodeConnectorId(
432 .map(portNumber -> InventoryDataServiceUtil
433 .nodeConnectorRefFromDatapathIdPortno(
434 deviceInfo.getDatapathId(),
439 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
441 if (!packetInLimiter.acquirePermit()) {
442 LOG.debug("Packet limited");
443 // TODO: save packet into emergency slot if possible
444 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
445 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
449 final ListenableFuture<?> offerNotification = notificationPublishService
450 .offerNotification(new PacketReceivedBuilder(packetIn)
451 .setIngress(nodeConnectorRef)
452 .setMatch(MatchUtil.transformMatch(match,
453 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
457 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
458 LOG.debug("notification offer rejected");
459 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
460 packetInLimiter.drainLowWaterMark();
461 packetInLimiter.releasePermit();
465 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
467 public void onSuccess(final Object result) {
468 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
469 packetInLimiter.releasePermit();
473 public void onFailure(final Throwable throwable) {
474 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
475 .FROM_SWITCH_NOTIFICATION_REJECTED);
476 LOG.debug("notification offer failed: {}", throwable.getMessage());
477 LOG.trace("notification offer failed..", throwable);
478 packetInLimiter.releasePermit();
480 }, MoreExecutors.directExecutor());
484 public void processExperimenterMessage(final ExperimenterMessage notification) {
485 if (isMasterOfDevice()) {
487 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
488 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
489 getDeviceInfo().getVersion(),
490 (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
491 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
492 extensionConverterProvider.getMessageConverter(key);
493 if (messageConverter == null) {
494 LOG.warn("custom converter for {}[OF:{}] not found",
495 notification.getExperimenterDataOfChoice().implementedInterface(),
496 getDeviceInfo().getVersion());
499 // build notification
500 final ExperimenterMessageOfChoice messageOfChoice;
501 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
502 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
503 ExperimenterMessageFromDevBuilder()
504 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
505 .setExperimenterMessageOfChoice(messageOfChoice);
507 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
509 LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
510 deviceInfo.getLOGValue());
515 // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
517 @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
518 public boolean processAlienMessage(final OfHeader message) {
519 final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
521 if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
522 .equals(implementedInterface)) {
523 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
524 .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
525 .rev130709.PacketInMessage) message;
527 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
535 public TranslatorLibrary oook() {
536 return translatorLibrary;
540 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
541 this.notificationPublishService = notificationPublishService;
545 public MessageSpy getMessageSpy() {
550 public void onPublished() {
551 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
555 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
557 return new MultiMsgCollectorImpl<>(this, requestContext);
561 public void updatePacketInRateLimit(final long upperBound) {
562 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
563 (int) (HIGH_WATERMARK_FACTOR * upperBound));
567 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
568 this.extensionConverterProvider = extensionConverterProvider;
572 public ExtensionConverterProvider getExtensionConverterProvider() {
573 return extensionConverterProvider;
577 TransactionChainManager getTransactionChainManager() {
578 return this.transactionChainManager;
582 public ListenableFuture<?> closeServiceInstance() {
583 final ListenableFuture<?> listenableFuture = initialized.get()
584 ? transactionChainManager.deactivateTransactionManager()
585 : Futures.immediateFuture(null);
587 hashedWheelTimer.newTimeout((timerTask) -> {
588 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
589 listenableFuture.cancel(true);
591 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
593 return listenableFuture;
597 public DeviceInfo getDeviceInfo() {
598 return this.deviceInfo;
602 public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
603 this.contextChainMastershipWatcher = newWatcher;
608 public ServiceGroupIdentifier getIdentifier() {
609 return deviceInfo.getServiceIdentifier();
613 public void acquireWriteTransactionLock() {
614 transactionChainManager.acquireWriteTransactionLock();
618 public void releaseWriteTransactionLock() {
619 transactionChainManager.releaseWriteTransactionLock();
623 public void close() {
624 // Close all datastore registries and transactions
625 if (initialized.getAndSet(false)) {
626 deviceGroupRegistry.close();
627 deviceFlowRegistry.close();
628 deviceMeterRegistry.close();
630 final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
632 Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
634 public void onSuccess(final Object result) {
635 transactionChainManager.close();
636 transactionChainManager = null;
640 public void onFailure(final Throwable throwable) {
641 transactionChainManager.close();
642 transactionChainManager = null;
644 }, MoreExecutors.directExecutor());
647 requestContexts.forEach(requestContext -> RequestContextUtil
648 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
649 requestContexts.clear();
653 public boolean canUseSingleLayerSerialization() {
654 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
658 public void instantiateServiceInstance() {
659 lazyTransactionManagerInitialization();
662 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
664 @SuppressWarnings({"checkstyle:IllegalCatch"})
665 public void initializeDevice() {
666 LOG.debug("Device initialization started for device {}", deviceInfo);
668 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
669 .retrieveAndClearPortStatusMessages();
670 portStatusMessages.forEach(this::writePortStatusMessage);
672 } catch (final Exception ex) {
673 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
674 deviceInfo.toString(), ex.toString()), ex);
677 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
678 .lookup(deviceInfo.getVersion());
680 if (initializer.isPresent()) {
681 final Future<Void> initialize = initializer
683 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
686 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
687 } catch (TimeoutException ex) {
688 initialize.cancel(true);
689 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
690 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
691 } catch (ExecutionException | InterruptedException ex) {
692 throw new RuntimeException(
693 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
696 throw new RuntimeException(String.format("Unsupported version %s for device %s",
697 deviceInfo.getVersion(),
698 deviceInfo.toString()));
701 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
702 getDeviceFlowRegistry().fill();
703 Futures.addCallback(deviceFlowRegistryFill,
704 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
705 MoreExecutors.directExecutor());
709 void lazyTransactionManagerInitialization() {
710 if (!this.initialized.get()) {
711 if (LOG.isDebugEnabled()) {
712 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
714 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
715 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
716 deviceInfo.getNodeInstanceIdentifier());
717 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
718 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
721 transactionChainManager.activateTransactionManager();
722 initialized.set(true);
727 public <T> RequestContext<T> createRequestContext() {
728 final Long xid = deviceInfo.reserveXidForDeviceMessage();
730 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
732 public void close() {
733 requestContexts.remove(this);
737 requestContexts.add(abstractRequestContext);
738 return abstractRequestContext;
742 public void onStateAcquired(final ContextChainState state) {
746 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
747 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
748 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
750 DeviceFlowRegistryCallback(
751 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
752 ContextChainMastershipWatcher contextChainMastershipWatcher) {
753 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
754 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
758 public void onSuccess(List<Optional<FlowCapableNode>> result) {
759 if (LOG.isDebugEnabled()) {
760 // Count all flows we read from datastore for debugging purposes.
761 // This number do not always represent how many flows were actually added
762 // to DeviceFlowRegistry, because of possible duplicates.
764 if (result != null) {
765 for (Optional<FlowCapableNode> optNode : result) {
766 if (optNode.isPresent()) {
767 flowCount += optNode.get().nonnullTable().stream()
768 .filter(Objects::nonNull)
769 .flatMap(table -> table.nonnullFlow().stream())
770 .filter(Objects::nonNull)
776 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
781 public void onFailure(Throwable throwable) {
782 if (deviceFlowRegistryFill.isCancelled()) {
783 if (LOG.isDebugEnabled()) {
784 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
787 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
789 contextChainMastershipWatcher.onNotAbleToStartMastership(
791 "Was not able to fill flow registry on device",