2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
52 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
57 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
58 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
59 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
60 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
61 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
62 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
64 import org.opendaylight.openflowplugin.impl.device.history.FlowGroupInfoHistoryImpl;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.common.Uint32;
107 import org.slf4j.Logger;
108 import org.slf4j.LoggerFactory;
110 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
111 // FIXME: make this configurable and expose a different implementation at least for OSGi when this is switched off
112 private static final int FLOWGROUP_CACHE_SIZE = 10000;
114 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
115 private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
117 // TODO: drain factor should be parametrized
118 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
119 // TODO: low water mark factor should be parametrized
120 private static final float LOW_WATERMARK_FACTOR = 0.75f;
121 // TODO: high water mark factor should be parametrized
122 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
124 // Timeout in milliseconds after what we will give up on initializing device
125 private static final int DEVICE_INIT_TIMEOUT = 9000;
127 // Timeout in milliseconds after what we will give up on closing transaction chain
128 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
130 private static final int LOW_WATERMARK = 1000;
131 private static final int HIGH_WATERMARK = 2000;
133 private final MultipartWriterProvider writerProvider;
134 private final HashedWheelTimer hashedWheelTimer;
135 private final DeviceState deviceState;
136 private final DataBroker dataBroker;
137 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
138 private final MessageSpy messageSpy;
139 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
140 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
141 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
142 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
143 private final TranslatorLibrary translatorLibrary;
144 private final ConvertorExecutor convertorExecutor;
145 private final DeviceInitializerProvider deviceInitializerProvider;
146 private final PacketInRateLimiter packetInLimiter;
147 private final DeviceInfo deviceInfo;
148 private final ConnectionContext primaryConnectionContext;
149 private final boolean skipTableFeatures;
150 private final boolean switchFeaturesMandatory;
151 private final boolean isFlowRemovedNotificationOn;
152 private final boolean useSingleLayerSerialization;
153 private final AtomicBoolean initialized = new AtomicBoolean(false);
154 private final AtomicBoolean hasState = new AtomicBoolean(false);
155 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
156 private final ContextChainHolder contextChainHolder;
157 private NotificationPublishService notificationPublishService;
158 private TransactionChainManager transactionChainManager;
159 private DeviceFlowRegistry deviceFlowRegistry;
160 private DeviceGroupRegistry deviceGroupRegistry;
161 private DeviceMeterRegistry deviceMeterRegistry;
162 private ExtensionConverterProvider extensionConverterProvider;
163 private ContextChainMastershipWatcher contextChainMastershipWatcher;
164 private FlowGroupInfoHistoryImpl history;
165 private final NotificationManager<String, Runnable> queuedNotificationManager;
166 private final boolean isStatisticsPollingOn;
168 DeviceContextImpl(@NonNull final ConnectionContext primaryConnectionContext,
169 @NonNull final DataBroker dataBroker,
170 @NonNull final MessageSpy messageSpy,
171 @NonNull final TranslatorLibrary translatorLibrary,
172 final ConvertorExecutor convertorExecutor,
173 final boolean skipTableFeatures,
174 final HashedWheelTimer hashedWheelTimer,
175 final boolean useSingleLayerSerialization,
176 final DeviceInitializerProvider deviceInitializerProvider,
177 final boolean isFlowRemovedNotificationOn,
178 final boolean switchFeaturesMandatory,
179 final ContextChainHolder contextChainHolder,
180 final NotificationManager<String, Runnable> queuedNotificationManager,
181 final boolean isStatisticsPollingOn) {
182 this.primaryConnectionContext = primaryConnectionContext;
183 deviceInfo = primaryConnectionContext.getDeviceInfo();
184 this.hashedWheelTimer = hashedWheelTimer;
185 this.deviceInitializerProvider = deviceInitializerProvider;
186 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
187 this.switchFeaturesMandatory = switchFeaturesMandatory;
188 deviceState = new DeviceStateImpl();
189 this.dataBroker = dataBroker;
190 this.messageSpy = messageSpy;
191 this.isStatisticsPollingOn = isStatisticsPollingOn;
192 this.contextChainHolder = contextChainHolder;
194 packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
195 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
197 this.translatorLibrary = translatorLibrary;
198 portStatusTranslator = translatorLibrary.lookupTranslator(
199 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
200 packetInTranslator = translatorLibrary.lookupTranslator(
201 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
203 .PacketIn.class.getName()));
204 flowRemovedTranslator = translatorLibrary.lookupTranslator(
205 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
207 this.convertorExecutor = convertorExecutor;
208 this.skipTableFeatures = skipTableFeatures;
209 this.useSingleLayerSerialization = useSingleLayerSerialization;
210 this.queuedNotificationManager = queuedNotificationManager;
211 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
215 public boolean initialSubmitTransaction() {
216 if (!initialized.get()) {
220 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
221 isInitialTransactionSubmitted.set(initialSubmit);
222 return initialSubmit;
226 public DeviceState getDeviceState() {
231 public ReadTransaction getReadTransaction() {
232 return dataBroker.newReadOnlyTransaction();
236 public boolean isTransactionsEnabled() {
237 return isInitialTransactionSubmitted.get();
241 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
242 final InstanceIdentifier<T> path,
244 if (initialized.get()) {
245 transactionChainManager.writeToTransaction(store, path, data, false);
250 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
251 final InstanceIdentifier<T> path,
253 if (initialized.get()) {
254 transactionChainManager.writeToTransaction(store, path, data, true);
259 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
260 final InstanceIdentifier<T> path) {
261 if (initialized.get()) {
262 transactionChainManager.addDeleteOperationToTxChain(store, path);
267 public boolean submitTransaction() {
268 return initialized.get() && transactionChainManager.submitTransaction();
272 public boolean syncSubmitTransaction() {
273 return initialized.get() && transactionChainManager.submitTransaction(true);
277 public ConnectionContext getPrimaryConnectionContext() {
278 return primaryConnectionContext;
282 public DeviceFlowRegistry getDeviceFlowRegistry() {
283 return deviceFlowRegistry;
287 public DeviceGroupRegistry getDeviceGroupRegistry() {
288 return deviceGroupRegistry;
292 public boolean isStatisticsPollingOn() {
293 return isStatisticsPollingOn;
297 public DeviceMeterRegistry getDeviceMeterRegistry() {
298 return deviceMeterRegistry;
302 public void processReply(final OfHeader ofHeader) {
303 messageSpy.spyMessage(
304 ofHeader.implementedInterface(),
305 ofHeader instanceof Error
306 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
307 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
311 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
312 ofHeaderList.forEach(header -> messageSpy.spyMessage(
313 header.implementedInterface(),
314 header instanceof Error
315 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
316 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
320 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
321 if (isMasterOfDevice()) {
322 //1. translate to general flow (table, priority, match, cookie)
323 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
324 .FlowRemoved flowRemovedNotification = flowRemovedTranslator
325 .translate(flowRemoved, deviceInfo, null);
327 if (isFlowRemovedNotificationOn) {
328 // Trigger off a notification
329 notificationPublishService.offerNotification(flowRemovedNotification);
332 LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
333 deviceInfo.getLOGValue());
338 @SuppressWarnings("checkstyle:IllegalCatch")
339 public void processPortStatusMessage(final PortStatusMessage portStatus) {
340 messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
341 .FROM_SWITCH_PUBLISHED_SUCCESS);
343 if (initialized.get()) {
345 writePortStatusMessage(portStatus);
346 } catch (final Exception e) {
347 LOG.warn("Error processing port status message for port {} on device {}",
348 portStatus.getPortNo(), getDeviceInfo(), e);
350 } else if (!hasState.get()) {
351 primaryConnectionContext.handlePortStatusMessage(portStatus);
355 @SuppressWarnings("checkstyle:IllegalCatch")
356 private void writePortStatusMessage(final PortStatus portStatusMessage) {
357 String datapathId = deviceInfo.getDatapathId().toString().intern();
358 queuedNotificationManager.submitNotification(datapathId, () -> {
360 acquireWriteTransactionLock();
361 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
362 .translate(portStatusMessage, getDeviceInfo(), null);
363 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
364 deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
365 portStatusMessage.getReason());
367 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
368 .getNodeInstanceIdentifier()
369 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
370 .nodeConnectorIdfromDatapathPortNo(
371 deviceInfo.getDatapathId(),
372 portStatusMessage.getPortNo(),
373 OpenflowVersion.get(deviceInfo.getVersion()))));
375 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
376 .withKey(iiToNodeConnector.getKey())
377 .addAugmentation(new FlowCapableNodeConnectorStatisticsDataBuilder().build())
378 .addAugmentation(flowCapableNodeConnector)
380 syncSubmitTransaction();
381 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
382 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
383 syncSubmitTransaction();
385 } catch (final Exception e) {
386 LOG.warn("Error processing port status message for port {} on device {}",
387 portStatusMessage.getPortNo(), datapathId, e);
389 releaseWriteTransactionLock();
395 public void processPacketInMessage(final PacketInMessage packetInMessage) {
396 if (isMasterOfDevice()) {
397 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
398 handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
400 LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
404 private Boolean isMasterOfDevice() {
405 final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
406 boolean result = false;
407 if (contextChain != null) {
408 result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
413 private void handlePacketInMessage(final PacketIn packetIn,
414 final Class<?> implementedInterface,
416 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
417 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
419 if (packetIn == null) {
420 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
421 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
425 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
427 // Try to get ingress from match
428 final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
429 ? packetIn.getIngress() : Optional.ofNullable(match)
430 .map(Match::getInPort)
431 .map(nodeConnectorId -> InventoryDataServiceUtil
432 .portNumberfromNodeConnectorId(
435 .map(portNumber -> InventoryDataServiceUtil
436 .nodeConnectorRefFromDatapathIdPortno(
437 deviceInfo.getDatapathId(),
442 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
444 if (!packetInLimiter.acquirePermit()) {
445 LOG.debug("Packet limited");
446 // TODO: save packet into emergency slot if possible
447 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
448 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
452 final ListenableFuture<?> offerNotification = notificationPublishService
453 .offerNotification(new PacketReceivedBuilder(packetIn)
454 .setIngress(nodeConnectorRef)
455 .setMatch(MatchUtil.transformMatch(match,
456 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
460 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
461 LOG.debug("notification offer rejected");
462 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
463 packetInLimiter.drainLowWaterMark();
464 packetInLimiter.releasePermit();
468 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
470 public void onSuccess(final Object result) {
471 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
472 packetInLimiter.releasePermit();
476 public void onFailure(final Throwable throwable) {
477 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
478 .FROM_SWITCH_NOTIFICATION_REJECTED);
479 LOG.debug("notification offer failed: {}", throwable.getMessage());
480 LOG.trace("notification offer failed..", throwable);
481 packetInLimiter.releasePermit();
483 }, MoreExecutors.directExecutor());
487 public void processExperimenterMessage(final ExperimenterMessage notification) {
488 if (isMasterOfDevice()) {
490 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
491 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
492 getDeviceInfo().getVersion(),
493 (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
494 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
495 extensionConverterProvider.getMessageConverter(key);
496 if (messageConverter == null) {
497 LOG.warn("custom converter for {}[OF:{}] not found",
498 notification.getExperimenterDataOfChoice().implementedInterface(),
499 getDeviceInfo().getVersion());
502 // build notification
503 final ExperimenterMessageOfChoice messageOfChoice;
504 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
505 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
506 ExperimenterMessageFromDevBuilder()
507 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
508 .setExperimenterMessageOfChoice(messageOfChoice);
510 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
512 LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
513 deviceInfo.getLOGValue());
518 // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
520 @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
521 public boolean processAlienMessage(final OfHeader message) {
522 final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
524 if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
525 .equals(implementedInterface)) {
526 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
527 .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
528 .rev130709.PacketInMessage) message;
530 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
538 public TranslatorLibrary oook() {
539 return translatorLibrary;
543 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
544 this.notificationPublishService = notificationPublishService;
548 public MessageSpy getMessageSpy() {
553 public void onPublished() {
554 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
558 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
560 return new MultiMsgCollectorImpl<>(this, requestContext);
564 public void updatePacketInRateLimit(final long upperBound) {
565 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
566 (int) (HIGH_WATERMARK_FACTOR * upperBound));
570 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
571 this.extensionConverterProvider = extensionConverterProvider;
575 public ExtensionConverterProvider getExtensionConverterProvider() {
576 return extensionConverterProvider;
580 TransactionChainManager getTransactionChainManager() {
581 return transactionChainManager;
585 public ListenableFuture<?> closeServiceInstance() {
586 final ListenableFuture<?> listenableFuture = initialized.get()
587 ? transactionChainManager.deactivateTransactionManager()
588 : Futures.immediateFuture(null);
590 hashedWheelTimer.newTimeout(timerTask -> {
591 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
592 listenableFuture.cancel(true);
594 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
596 return listenableFuture;
600 public DeviceInfo getDeviceInfo() {
605 public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
606 contextChainMastershipWatcher = newWatcher;
610 public ServiceGroupIdentifier getIdentifier() {
611 return deviceInfo.getServiceIdentifier();
615 public void acquireWriteTransactionLock() {
616 transactionChainManager.acquireWriteTransactionLock();
620 public void releaseWriteTransactionLock() {
621 transactionChainManager.releaseWriteTransactionLock();
625 public void close() {
626 // Close all datastore registries and transactions
627 if (initialized.getAndSet(false)) {
628 deviceGroupRegistry.close();
629 deviceFlowRegistry.close();
630 deviceMeterRegistry.close();
632 final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
634 Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
636 public void onSuccess(final Object result) {
637 transactionChainManager.close();
638 transactionChainManager = null;
642 public void onFailure(final Throwable throwable) {
643 transactionChainManager.close();
644 transactionChainManager = null;
646 }, MoreExecutors.directExecutor());
649 requestContexts.forEach(requestContext -> RequestContextUtil
650 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
651 requestContexts.clear();
655 public boolean canUseSingleLayerSerialization() {
656 return useSingleLayerSerialization && OFConstants.OFP_VERSION_1_3.compareTo(getDeviceInfo().getVersion()) <= 0;
660 public void instantiateServiceInstance() {
661 lazyTransactionManagerInitialization();
664 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
666 @SuppressWarnings({"checkstyle:IllegalCatch"})
667 public void initializeDevice() {
668 LOG.debug("Device initialization started for device {}", deviceInfo);
670 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
671 .retrieveAndClearPortStatusMessages();
672 portStatusMessages.forEach(this::writePortStatusMessage);
674 } catch (final Exception ex) {
675 throw new IllegalStateException(String.format("Error processing port status messages from device %s: %s",
676 deviceInfo.toString(), ex.toString()), ex);
679 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
680 .lookup(deviceInfo.getVersion());
682 if (initializer.isPresent()) {
683 final Future<Void> initialize = initializer.orElseThrow()
684 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
687 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
688 } catch (TimeoutException ex) {
689 initialize.cancel(true);
690 throw new IllegalStateException(String.format("Failed to initialize device %s in %ss: %s",
691 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
692 } catch (ExecutionException | InterruptedException ex) {
693 throw new IllegalStateException(
694 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
697 throw new IllegalStateException(String.format("Unsupported version %s for device %s",
698 deviceInfo.getVersion(),
699 deviceInfo.toString()));
702 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
703 getDeviceFlowRegistry().fill();
704 Futures.addCallback(deviceFlowRegistryFill,
705 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
706 MoreExecutors.directExecutor());
710 void lazyTransactionManagerInitialization() {
711 if (!initialized.get()) {
712 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
713 final NodeId nodeId = deviceInfo.getNodeId();
714 transactionChainManager = new TransactionChainManager(dataBroker, nodeId.getValue());
715 history = new FlowGroupInfoHistoryImpl(FLOWGROUP_CACHE_SIZE);
716 deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
717 deviceInfo.getNodeInstanceIdentifier(), history);
718 deviceGroupRegistry = new DeviceGroupRegistryImpl(history);
719 deviceMeterRegistry = new DeviceMeterRegistryImpl();
722 transactionChainManager.activateTransactionManager();
723 initialized.set(true);
727 public <T> RequestContext<T> createRequestContext() {
728 final Uint32 xid = deviceInfo.reserveXidForDeviceMessage();
730 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<>(xid) {
732 public void close() {
733 requestContexts.remove(this);
737 requestContexts.add(abstractRequestContext);
738 return abstractRequestContext;
742 public void onStateAcquired(final ContextChainState state) {
747 public FlowGroupInfoHistory getFlowGroupInfoHistory() {
751 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
752 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
753 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
755 DeviceFlowRegistryCallback(
756 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
757 final ContextChainMastershipWatcher contextChainMastershipWatcher) {
758 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
759 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
763 public void onSuccess(final List<Optional<FlowCapableNode>> result) {
764 if (LOG.isDebugEnabled()) {
765 // Count all flows we read from datastore for debugging purposes.
766 // This number do not always represent how many flows were actually added
767 // to DeviceFlowRegistry, because of possible duplicates.
769 if (result != null) {
770 for (Optional<FlowCapableNode> optNode : result) {
771 if (optNode.isPresent()) {
772 flowCount += optNode.orElseThrow().nonnullTable().values().stream()
773 .filter(Objects::nonNull)
774 .flatMap(table -> table.nonnullFlow().values().stream())
775 .filter(Objects::nonNull)
781 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
786 public void onFailure(final Throwable throwable) {
787 if (deviceFlowRegistryFill.isCancelled()) {
788 if (LOG.isDebugEnabled()) {
789 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
792 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
794 contextChainMastershipWatcher.onNotAbleToStartMastership(
796 "Was not able to fill flow registry on device",