2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper, DeviceInitializationContext {
111 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112 private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
114 // TODO: drain factor should be parametrized
115 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
116 // TODO: low water mark factor should be parametrized
117 private static final float LOW_WATERMARK_FACTOR = 0.75f;
118 // TODO: high water mark factor should be parametrized
119 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
121 // Timeout in milliseconds after what we will give up on initializing device
122 private static final int DEVICE_INIT_TIMEOUT = 9000;
124 // Timeout in milliseconds after what we will give up on closing transaction chain
125 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
127 private static final int LOW_WATERMARK = 1000;
128 private static final int HIGH_WATERMARK = 2000;
130 private final MultipartWriterProvider writerProvider;
131 private final HashedWheelTimer hashedWheelTimer;
132 private final DeviceState deviceState;
133 private final DataBroker dataBroker;
134 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
135 private final MessageSpy messageSpy;
136 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
137 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
138 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
139 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
140 private final TranslatorLibrary translatorLibrary;
141 private final ConvertorExecutor convertorExecutor;
142 private final DeviceInitializerProvider deviceInitializerProvider;
143 private final PacketInRateLimiter packetInLimiter;
144 private final DeviceInfo deviceInfo;
145 private final ConnectionContext primaryConnectionContext;
146 private final boolean skipTableFeatures;
147 private final boolean switchFeaturesMandatory;
148 private final boolean isFlowRemovedNotificationOn;
149 private final boolean useSingleLayerSerialization;
150 private final AtomicBoolean initialized = new AtomicBoolean(false);
151 private final AtomicBoolean hasState = new AtomicBoolean(false);
152 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
153 private final ContextChainHolder contextChainHolder;
154 private NotificationPublishService notificationPublishService;
155 private TransactionChainManager transactionChainManager;
156 private DeviceFlowRegistry deviceFlowRegistry;
157 private DeviceGroupRegistry deviceGroupRegistry;
158 private DeviceMeterRegistry deviceMeterRegistry;
159 private ExtensionConverterProvider extensionConverterProvider;
160 private ContextChainMastershipWatcher contextChainMastershipWatcher;
161 private final NotificationManager<String, Runnable> queuedNotificationManager;
162 private final boolean isStatisticsPollingOn;
164 DeviceContextImpl(@NonNull final ConnectionContext primaryConnectionContext,
165 @NonNull final DataBroker dataBroker,
166 @NonNull final MessageSpy messageSpy,
167 @NonNull final TranslatorLibrary translatorLibrary,
168 final ConvertorExecutor convertorExecutor,
169 final boolean skipTableFeatures,
170 final HashedWheelTimer hashedWheelTimer,
171 final boolean useSingleLayerSerialization,
172 final DeviceInitializerProvider deviceInitializerProvider,
173 final boolean isFlowRemovedNotificationOn,
174 final boolean switchFeaturesMandatory,
175 final ContextChainHolder contextChainHolder,
176 final NotificationManager queuedNotificationManager,
177 final boolean isStatisticsPollingOn) {
178 this.primaryConnectionContext = primaryConnectionContext;
179 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
180 this.hashedWheelTimer = hashedWheelTimer;
181 this.deviceInitializerProvider = deviceInitializerProvider;
182 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
183 this.switchFeaturesMandatory = switchFeaturesMandatory;
184 this.deviceState = new DeviceStateImpl();
185 this.dataBroker = dataBroker;
186 this.messageSpy = messageSpy;
187 this.isStatisticsPollingOn = isStatisticsPollingOn;
188 this.contextChainHolder = contextChainHolder;
190 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
191 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
193 this.translatorLibrary = translatorLibrary;
194 this.portStatusTranslator = translatorLibrary.lookupTranslator(
195 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
196 this.packetInTranslator = translatorLibrary.lookupTranslator(
197 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
199 .PacketIn.class.getName()));
200 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
201 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
203 this.convertorExecutor = convertorExecutor;
204 this.skipTableFeatures = skipTableFeatures;
205 this.useSingleLayerSerialization = useSingleLayerSerialization;
206 this.queuedNotificationManager = queuedNotificationManager;
207 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
211 public boolean initialSubmitTransaction() {
212 if (!initialized.get()) {
216 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
217 isInitialTransactionSubmitted.set(initialSubmit);
218 return initialSubmit;
222 public DeviceState getDeviceState() {
227 public ReadTransaction getReadTransaction() {
228 return dataBroker.newReadOnlyTransaction();
232 public boolean isTransactionsEnabled() {
233 return isInitialTransactionSubmitted.get();
237 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
238 final InstanceIdentifier<T> path,
240 if (initialized.get()) {
241 transactionChainManager.writeToTransaction(store, path, data, false);
246 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
247 final InstanceIdentifier<T> path,
249 if (initialized.get()) {
250 transactionChainManager.writeToTransaction(store, path, data, true);
255 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
256 final InstanceIdentifier<T> path) {
257 if (initialized.get()) {
258 transactionChainManager.addDeleteOperationToTxChain(store, path);
263 public boolean submitTransaction() {
264 return initialized.get() && transactionChainManager.submitTransaction();
268 public boolean syncSubmitTransaction() {
269 return initialized.get() && transactionChainManager.submitTransaction(true);
273 public ConnectionContext getPrimaryConnectionContext() {
274 return primaryConnectionContext;
278 public DeviceFlowRegistry getDeviceFlowRegistry() {
279 return deviceFlowRegistry;
283 public DeviceGroupRegistry getDeviceGroupRegistry() {
284 return deviceGroupRegistry;
288 public boolean isStatisticsPollingOn() {
289 return isStatisticsPollingOn;
293 public DeviceMeterRegistry getDeviceMeterRegistry() {
294 return deviceMeterRegistry;
298 public void processReply(final OfHeader ofHeader) {
299 messageSpy.spyMessage(
300 ofHeader.implementedInterface(),
301 ofHeader instanceof Error
302 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
307 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
308 ofHeaderList.forEach(header -> messageSpy.spyMessage(
309 header.implementedInterface(),
310 header instanceof Error
311 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
312 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
316 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
317 if (isMasterOfDevice()) {
318 //1. translate to general flow (table, priority, match, cookie)
319 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
320 .FlowRemoved flowRemovedNotification = flowRemovedTranslator
321 .translate(flowRemoved, deviceInfo, null);
323 if (isFlowRemovedNotificationOn) {
324 // Trigger off a notification
325 notificationPublishService.offerNotification(flowRemovedNotification);
328 LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
329 deviceInfo.getLOGValue());
334 @SuppressWarnings("checkstyle:IllegalCatch")
335 public void processPortStatusMessage(final PortStatusMessage portStatus) {
336 messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
337 .FROM_SWITCH_PUBLISHED_SUCCESS);
339 if (initialized.get()) {
341 writePortStatusMessage(portStatus);
342 } catch (final Exception e) {
343 LOG.warn("Error processing port status message for port {} on device {}",
344 portStatus.getPortNo(), getDeviceInfo(), e);
346 } else if (!hasState.get()) {
347 primaryConnectionContext.handlePortStatusMessage(portStatus);
351 @SuppressWarnings("checkstyle:IllegalCatch")
352 private void writePortStatusMessage(final PortStatus portStatusMessage) {
353 String datapathId = deviceInfo.getDatapathId().toString().intern();
354 queuedNotificationManager.submitNotification(datapathId, () -> {
356 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
357 .translate(portStatusMessage, getDeviceInfo(), null);
358 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
359 deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
360 portStatusMessage.getReason());
362 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
363 .getNodeInstanceIdentifier()
364 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
365 .nodeConnectorIdfromDatapathPortNo(
366 deviceInfo.getDatapathId(),
367 portStatusMessage.getPortNo(),
368 OpenflowVersion.get(deviceInfo.getVersion()))));
370 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
371 .withKey(iiToNodeConnector.getKey())
372 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
373 FlowCapableNodeConnectorStatisticsDataBuilder().build())
374 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
376 syncSubmitTransaction();
377 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
378 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
379 syncSubmitTransaction();
381 } catch (final Exception e) {
382 LOG.warn("Error processing port status message for port {} on device {}",
383 portStatusMessage.getPortNo(), datapathId, e);
389 public void processPacketInMessage(final PacketInMessage packetInMessage) {
390 if (isMasterOfDevice()) {
391 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
392 handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
394 LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
398 private Boolean isMasterOfDevice() {
399 final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
400 boolean result = false;
401 if (contextChain != null) {
402 result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
407 private void handlePacketInMessage(final PacketIn packetIn,
408 final Class<?> implementedInterface,
410 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
411 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
413 if (packetIn == null) {
414 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
415 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
419 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
421 // Try to get ingress from match
422 final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
423 ? packetIn.getIngress() : Optional.ofNullable(match)
424 .map(Match::getInPort)
425 .map(nodeConnectorId -> InventoryDataServiceUtil
426 .portNumberfromNodeConnectorId(
429 .map(portNumber -> InventoryDataServiceUtil
430 .nodeConnectorRefFromDatapathIdPortno(
431 deviceInfo.getDatapathId(),
436 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
438 if (!packetInLimiter.acquirePermit()) {
439 LOG.debug("Packet limited");
440 // TODO: save packet into emergency slot if possible
441 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
442 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
446 final ListenableFuture<?> offerNotification = notificationPublishService
447 .offerNotification(new PacketReceivedBuilder(packetIn)
448 .setIngress(nodeConnectorRef)
449 .setMatch(MatchUtil.transformMatch(match,
450 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
454 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
455 LOG.debug("notification offer rejected");
456 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
457 packetInLimiter.drainLowWaterMark();
458 packetInLimiter.releasePermit();
462 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
464 public void onSuccess(final Object result) {
465 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
466 packetInLimiter.releasePermit();
470 public void onFailure(final Throwable throwable) {
471 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
472 .FROM_SWITCH_NOTIFICATION_REJECTED);
473 LOG.debug("notification offer failed: {}", throwable.getMessage());
474 LOG.trace("notification offer failed..", throwable);
475 packetInLimiter.releasePermit();
477 }, MoreExecutors.directExecutor());
481 public void processExperimenterMessage(final ExperimenterMessage notification) {
482 if (isMasterOfDevice()) {
484 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
485 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
486 getDeviceInfo().getVersion(),
487 (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
488 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
489 extensionConverterProvider.getMessageConverter(key);
490 if (messageConverter == null) {
491 LOG.warn("custom converter for {}[OF:{}] not found",
492 notification.getExperimenterDataOfChoice().implementedInterface(),
493 getDeviceInfo().getVersion());
496 // build notification
497 final ExperimenterMessageOfChoice messageOfChoice;
498 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
499 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
500 ExperimenterMessageFromDevBuilder()
501 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
502 .setExperimenterMessageOfChoice(messageOfChoice);
504 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
506 LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
507 deviceInfo.getLOGValue());
512 // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
514 @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
515 public boolean processAlienMessage(final OfHeader message) {
516 final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
518 if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
519 .equals(implementedInterface)) {
520 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
521 .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
522 .rev130709.PacketInMessage) message;
524 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
532 public TranslatorLibrary oook() {
533 return translatorLibrary;
537 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
538 this.notificationPublishService = notificationPublishService;
542 public MessageSpy getMessageSpy() {
547 public void onPublished() {
548 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
552 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
554 return new MultiMsgCollectorImpl<>(this, requestContext);
558 public void updatePacketInRateLimit(final long upperBound) {
559 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
560 (int) (HIGH_WATERMARK_FACTOR * upperBound));
564 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
565 this.extensionConverterProvider = extensionConverterProvider;
569 public ExtensionConverterProvider getExtensionConverterProvider() {
570 return extensionConverterProvider;
574 TransactionChainManager getTransactionChainManager() {
575 return this.transactionChainManager;
579 public ListenableFuture<?> closeServiceInstance() {
580 final ListenableFuture<?> listenableFuture = initialized.get()
581 ? transactionChainManager.deactivateTransactionManager()
582 : Futures.immediateFuture(null);
584 hashedWheelTimer.newTimeout((timerTask) -> {
585 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
586 listenableFuture.cancel(true);
588 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
590 return listenableFuture;
594 public DeviceInfo getDeviceInfo() {
595 return this.deviceInfo;
599 public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
600 this.contextChainMastershipWatcher = newWatcher;
605 public ServiceGroupIdentifier getIdentifier() {
606 return deviceInfo.getServiceIdentifier();
610 public void close() {
611 // Close all datastore registries and transactions
612 if (initialized.getAndSet(false)) {
613 deviceGroupRegistry.close();
614 deviceFlowRegistry.close();
615 deviceMeterRegistry.close();
617 final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
619 Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
621 public void onSuccess(final Object result) {
622 transactionChainManager.close();
623 transactionChainManager = null;
627 public void onFailure(final Throwable throwable) {
628 transactionChainManager.close();
629 transactionChainManager = null;
631 }, MoreExecutors.directExecutor());
634 requestContexts.forEach(requestContext -> RequestContextUtil
635 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
636 requestContexts.clear();
640 public boolean canUseSingleLayerSerialization() {
641 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
645 public void instantiateServiceInstance() {
646 lazyTransactionManagerInitialization();
649 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
651 @SuppressWarnings({"checkstyle:IllegalCatch"})
652 public void initializeDevice() {
653 LOG.debug("Device initialization started for device {}", deviceInfo);
655 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
656 .retrieveAndClearPortStatusMessages();
657 portStatusMessages.forEach(this::writePortStatusMessage);
659 } catch (final Exception ex) {
660 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
661 deviceInfo.toString(), ex.toString()), ex);
664 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
665 .lookup(deviceInfo.getVersion());
667 if (initializer.isPresent()) {
668 final Future<Void> initialize = initializer
670 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
673 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
674 } catch (TimeoutException ex) {
675 initialize.cancel(true);
676 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
677 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
678 } catch (ExecutionException | InterruptedException ex) {
679 throw new RuntimeException(
680 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
683 throw new RuntimeException(String.format("Unsupported version %s for device %s",
684 deviceInfo.getVersion(),
685 deviceInfo.toString()));
688 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
689 getDeviceFlowRegistry().fill();
690 Futures.addCallback(deviceFlowRegistryFill,
691 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
692 MoreExecutors.directExecutor());
696 void lazyTransactionManagerInitialization() {
697 if (!this.initialized.get()) {
698 if (LOG.isDebugEnabled()) {
699 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
701 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
702 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
703 deviceInfo.getNodeInstanceIdentifier());
704 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
705 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
708 transactionChainManager.activateTransactionManager();
709 initialized.set(true);
714 public <T> RequestContext<T> createRequestContext() {
715 final Long xid = deviceInfo.reserveXidForDeviceMessage();
717 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
719 public void close() {
720 requestContexts.remove(this);
724 requestContexts.add(abstractRequestContext);
725 return abstractRequestContext;
729 public void onStateAcquired(final ContextChainState state) {
733 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
734 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
735 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
737 DeviceFlowRegistryCallback(
738 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
739 ContextChainMastershipWatcher contextChainMastershipWatcher) {
740 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
741 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
745 public void onSuccess(List<Optional<FlowCapableNode>> result) {
746 if (LOG.isDebugEnabled()) {
747 // Count all flows we read from datastore for debugging purposes.
748 // This number do not always represent how many flows were actually added
749 // to DeviceFlowRegistry, because of possible duplicates.
751 if (result != null) {
752 for (Optional<FlowCapableNode> optNode : result) {
753 if (optNode.isPresent()) {
754 flowCount += optNode.get().nonnullTable().stream()
755 .filter(Objects::nonNull)
756 .flatMap(table -> table.nonnullFlow().stream())
757 .filter(Objects::nonNull)
763 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
768 public void onFailure(Throwable throwable) {
769 if (deviceFlowRegistryFill.isCancelled()) {
770 if (LOG.isDebugEnabled()) {
771 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
774 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
776 contextChainMastershipWatcher.onNotAbleToStartMastership(
778 "Was not able to fill flow registry on device",