2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
43 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
52 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
53 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
54 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
55 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
59 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
60 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
62 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
63 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
65 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
66 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
69 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
72 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
73 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
74 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
76 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper, DeviceInitializationContext {
111 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112 private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
114 // TODO: drain factor should be parametrized
115 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
116 // TODO: low water mark factor should be parametrized
117 private static final float LOW_WATERMARK_FACTOR = 0.75f;
118 // TODO: high water mark factor should be parametrized
119 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
121 // Timeout in milliseconds after what we will give up on initializing device
122 private static final int DEVICE_INIT_TIMEOUT = 9000;
124 // Timeout in milliseconds after what we will give up on closing transaction chain
125 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
127 private static final int LOW_WATERMARK = 1000;
128 private static final int HIGH_WATERMARK = 2000;
130 private final MultipartWriterProvider writerProvider;
131 private final HashedWheelTimer hashedWheelTimer;
132 private final DeviceState deviceState;
133 private final DataBroker dataBroker;
134 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
135 private final MessageSpy messageSpy;
136 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
137 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
138 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
139 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
140 private final TranslatorLibrary translatorLibrary;
141 private final ConvertorExecutor convertorExecutor;
142 private final DeviceInitializerProvider deviceInitializerProvider;
143 private final PacketInRateLimiter packetInLimiter;
144 private final DeviceInfo deviceInfo;
145 private final ConnectionContext primaryConnectionContext;
146 private final boolean skipTableFeatures;
147 private final boolean switchFeaturesMandatory;
148 private final boolean isFlowRemovedNotificationOn;
149 private final boolean useSingleLayerSerialization;
150 private final AtomicBoolean initialized = new AtomicBoolean(false);
151 private final AtomicBoolean hasState = new AtomicBoolean(false);
152 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
153 private final ContextChainHolder contextChainHolder;
154 private NotificationPublishService notificationPublishService;
155 private TransactionChainManager transactionChainManager;
156 private DeviceFlowRegistry deviceFlowRegistry;
157 private DeviceGroupRegistry deviceGroupRegistry;
158 private DeviceMeterRegistry deviceMeterRegistry;
159 private ExtensionConverterProvider extensionConverterProvider;
160 private ContextChainMastershipWatcher contextChainMastershipWatcher;
162 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
163 @Nonnull final DataBroker dataBroker,
164 @Nonnull final MessageSpy messageSpy,
165 @Nonnull final TranslatorLibrary translatorLibrary,
166 final ConvertorExecutor convertorExecutor,
167 final boolean skipTableFeatures,
168 final HashedWheelTimer hashedWheelTimer,
169 final boolean useSingleLayerSerialization,
170 final DeviceInitializerProvider deviceInitializerProvider,
171 final boolean isFlowRemovedNotificationOn,
172 final boolean switchFeaturesMandatory,
173 final ContextChainHolder contextChainHolder) {
175 this.primaryConnectionContext = primaryConnectionContext;
176 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
177 this.hashedWheelTimer = hashedWheelTimer;
178 this.deviceInitializerProvider = deviceInitializerProvider;
179 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
180 this.switchFeaturesMandatory = switchFeaturesMandatory;
181 this.deviceState = new DeviceStateImpl();
182 this.dataBroker = dataBroker;
183 this.messageSpy = messageSpy;
184 this.contextChainHolder = contextChainHolder;
186 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
187 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
189 this.translatorLibrary = translatorLibrary;
190 this.portStatusTranslator = translatorLibrary.lookupTranslator(
191 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
192 this.packetInTranslator = translatorLibrary.lookupTranslator(
193 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
195 .PacketIn.class.getName()));
196 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
197 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
199 this.convertorExecutor = convertorExecutor;
200 this.skipTableFeatures = skipTableFeatures;
201 this.useSingleLayerSerialization = useSingleLayerSerialization;
202 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
206 public boolean initialSubmitTransaction() {
207 if (!initialized.get()) {
211 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
212 isInitialTransactionSubmitted.set(initialSubmit);
213 return initialSubmit;
217 public DeviceState getDeviceState() {
222 public ReadTransaction getReadTransaction() {
223 return dataBroker.newReadOnlyTransaction();
227 public boolean isTransactionsEnabled() {
228 return isInitialTransactionSubmitted.get();
232 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
233 final InstanceIdentifier<T> path,
235 if (initialized.get()) {
236 transactionChainManager.writeToTransaction(store, path, data, false);
241 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
242 final InstanceIdentifier<T> path,
244 if (initialized.get()) {
245 transactionChainManager.writeToTransaction(store, path, data, true);
250 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
251 final InstanceIdentifier<T> path) {
252 if (initialized.get()) {
253 transactionChainManager.addDeleteOperationToTxChain(store, path);
258 public boolean submitTransaction() {
259 return initialized.get() && transactionChainManager.submitTransaction();
263 public boolean syncSubmitTransaction() {
264 return initialized.get() && transactionChainManager.submitTransaction(true);
268 public ConnectionContext getPrimaryConnectionContext() {
269 return primaryConnectionContext;
273 public DeviceFlowRegistry getDeviceFlowRegistry() {
274 return deviceFlowRegistry;
278 public DeviceGroupRegistry getDeviceGroupRegistry() {
279 return deviceGroupRegistry;
283 public DeviceMeterRegistry getDeviceMeterRegistry() {
284 return deviceMeterRegistry;
288 public void processReply(final OfHeader ofHeader) {
289 messageSpy.spyMessage(
290 ofHeader.implementedInterface(),
291 ofHeader instanceof Error
292 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
293 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
297 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
298 ofHeaderList.forEach(header -> messageSpy.spyMessage(
299 header.implementedInterface(),
300 header instanceof Error
301 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
302 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
306 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
307 if (isMasterOfDevice()) {
308 //1. translate to general flow (table, priority, match, cookie)
309 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
310 .FlowRemoved flowRemovedNotification = flowRemovedTranslator
311 .translate(flowRemoved, deviceInfo, null);
313 if (isFlowRemovedNotificationOn) {
314 // Trigger off a notification
315 notificationPublishService.offerNotification(flowRemovedNotification);
318 LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
319 deviceInfo.getLOGValue());
324 @SuppressWarnings("checkstyle:IllegalCatch")
325 public void processPortStatusMessage(final PortStatusMessage portStatus) {
326 messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
327 .FROM_SWITCH_PUBLISHED_SUCCESS);
329 if (initialized.get()) {
331 writePortStatusMessage(portStatus);
332 } catch (final Exception e) {
333 LOG.warn("Error processing port status message for port {} on device {}",
334 portStatus.getPortNo(), getDeviceInfo(), e);
336 } else if (!hasState.get()) {
337 primaryConnectionContext.handlePortStatusMessage(portStatus);
341 private void writePortStatusMessage(final PortStatus portStatusMessage) {
342 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
343 .translate(portStatusMessage, getDeviceInfo(), null);
344 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
345 deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
346 portStatusMessage.getReason());
348 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
349 .getNodeInstanceIdentifier()
350 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
351 .nodeConnectorIdfromDatapathPortNo(
352 deviceInfo.getDatapathId(),
353 portStatusMessage.getPortNo(),
354 OpenflowVersion.get(deviceInfo.getVersion()))));
356 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
357 .withKey(iiToNodeConnector.getKey())
358 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
359 FlowCapableNodeConnectorStatisticsDataBuilder().build())
360 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
362 syncSubmitTransaction();
363 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
364 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
365 syncSubmitTransaction();
370 public void processPacketInMessage(final PacketInMessage packetInMessage) {
371 if (isMasterOfDevice()) {
372 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
373 handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
375 LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
379 private Boolean isMasterOfDevice() {
380 final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
381 boolean result = false;
382 if (contextChain != null) {
383 result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
388 private void handlePacketInMessage(final PacketIn packetIn,
389 final Class<?> implementedInterface,
391 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
392 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
394 if (packetIn == null) {
395 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
396 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
400 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
402 // Try to get ingress from match
403 final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
404 ? packetIn.getIngress() : Optional.ofNullable(match)
405 .map(Match::getInPort)
406 .map(nodeConnectorId -> InventoryDataServiceUtil
407 .portNumberfromNodeConnectorId(
410 .map(portNumber -> InventoryDataServiceUtil
411 .nodeConnectorRefFromDatapathIdPortno(
412 deviceInfo.getDatapathId(),
417 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
419 if (!packetInLimiter.acquirePermit()) {
420 LOG.debug("Packet limited");
421 // TODO: save packet into emergency slot if possible
422 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
423 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
427 final ListenableFuture<?> offerNotification = notificationPublishService
428 .offerNotification(new PacketReceivedBuilder(packetIn)
429 .setIngress(nodeConnectorRef)
430 .setMatch(MatchUtil.transformMatch(match,
431 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
435 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
436 LOG.debug("notification offer rejected");
437 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
438 packetInLimiter.drainLowWaterMark();
439 packetInLimiter.releasePermit();
443 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
445 public void onSuccess(final Object result) {
446 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
447 packetInLimiter.releasePermit();
451 public void onFailure(final Throwable throwable) {
452 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
453 .FROM_SWITCH_NOTIFICATION_REJECTED);
454 LOG.debug("notification offer failed: {}", throwable.getMessage());
455 LOG.trace("notification offer failed..", throwable);
456 packetInLimiter.releasePermit();
458 }, MoreExecutors.directExecutor());
462 public void processExperimenterMessage(final ExperimenterMessage notification) {
463 if (isMasterOfDevice()) {
465 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
466 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
467 getDeviceInfo().getVersion(),
468 (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
469 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
470 extensionConverterProvider.getMessageConverter(key);
471 if (messageConverter == null) {
472 LOG.warn("custom converter for {}[OF:{}] not found",
473 notification.getExperimenterDataOfChoice().implementedInterface(),
474 getDeviceInfo().getVersion());
477 // build notification
478 final ExperimenterMessageOfChoice messageOfChoice;
479 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
480 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
481 ExperimenterMessageFromDevBuilder()
482 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
483 .setExperimenterMessageOfChoice(messageOfChoice);
485 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
487 LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
488 deviceInfo.getLOGValue());
493 // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
495 @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
496 public boolean processAlienMessage(final OfHeader message) {
497 final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
499 if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
500 .equals(implementedInterface)) {
501 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
502 .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503 .rev130709.PacketInMessage) message;
505 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
513 public TranslatorLibrary oook() {
514 return translatorLibrary;
518 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
519 this.notificationPublishService = notificationPublishService;
523 public MessageSpy getMessageSpy() {
528 public void onPublished() {
529 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
533 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
535 return new MultiMsgCollectorImpl<>(this, requestContext);
539 public void updatePacketInRateLimit(final long upperBound) {
540 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
541 (int) (HIGH_WATERMARK_FACTOR * upperBound));
545 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
546 this.extensionConverterProvider = extensionConverterProvider;
550 public ExtensionConverterProvider getExtensionConverterProvider() {
551 return extensionConverterProvider;
555 TransactionChainManager getTransactionChainManager() {
556 return this.transactionChainManager;
560 public ListenableFuture<?> closeServiceInstance() {
561 final ListenableFuture<?> listenableFuture = initialized.get()
562 ? transactionChainManager.deactivateTransactionManager()
563 : Futures.immediateFuture(null);
565 hashedWheelTimer.newTimeout((timerTask) -> {
566 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
567 listenableFuture.cancel(true);
569 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
571 return listenableFuture;
575 public DeviceInfo getDeviceInfo() {
576 return this.deviceInfo;
580 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
581 this.contextChainMastershipWatcher = newWatcher;
586 public ServiceGroupIdentifier getIdentifier() {
587 return deviceInfo.getServiceIdentifier();
591 public void close() {
592 // Close all datastore registries and transactions
593 if (initialized.getAndSet(false)) {
594 deviceGroupRegistry.close();
595 deviceFlowRegistry.close();
596 deviceMeterRegistry.close();
598 final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
600 Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
602 public void onSuccess(final Object result) {
603 transactionChainManager.close();
604 transactionChainManager = null;
608 public void onFailure(final Throwable throwable) {
609 transactionChainManager.close();
610 transactionChainManager = null;
612 }, MoreExecutors.directExecutor());
615 requestContexts.forEach(requestContext -> RequestContextUtil
616 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
617 requestContexts.clear();
621 public boolean canUseSingleLayerSerialization() {
622 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
626 public void instantiateServiceInstance() {
627 lazyTransactionManagerInitialization();
630 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
632 @SuppressWarnings({"checkstyle:IllegalCatch"})
633 public void initializeDevice() {
634 LOG.debug("Device initialization started for device {}", deviceInfo);
636 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
637 .retrieveAndClearPortStatusMessages();
639 portStatusMessages.forEach(this::writePortStatusMessage);
641 } catch (final Exception ex) {
642 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
643 deviceInfo.toString(), ex.toString()), ex);
646 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
647 .lookup(deviceInfo.getVersion());
649 if (initializer.isPresent()) {
650 final Future<Void> initialize = initializer
652 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
655 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
656 } catch (TimeoutException ex) {
657 initialize.cancel(true);
658 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
659 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
660 } catch (ExecutionException | InterruptedException ex) {
661 throw new RuntimeException(
662 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
665 throw new RuntimeException(String.format("Unsupported version %s for device %s",
666 deviceInfo.getVersion(),
667 deviceInfo.toString()));
670 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
671 getDeviceFlowRegistry().fill();
672 Futures.addCallback(deviceFlowRegistryFill,
673 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
674 MoreExecutors.directExecutor());
678 void lazyTransactionManagerInitialization() {
679 if (!this.initialized.get()) {
680 if (LOG.isDebugEnabled()) {
681 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
683 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
684 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
685 deviceInfo.getNodeInstanceIdentifier());
686 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
687 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
690 transactionChainManager.activateTransactionManager();
691 initialized.set(true);
696 public <T> RequestContext<T> createRequestContext() {
697 final Long xid = deviceInfo.reserveXidForDeviceMessage();
699 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
701 public void close() {
702 requestContexts.remove(this);
706 requestContexts.add(abstractRequestContext);
707 return abstractRequestContext;
711 public void onStateAcquired(final ContextChainState state) {
715 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
716 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
717 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
719 DeviceFlowRegistryCallback(
720 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
721 ContextChainMastershipWatcher contextChainMastershipWatcher) {
722 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
723 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
727 public void onSuccess(List<Optional<FlowCapableNode>> result) {
728 if (LOG.isDebugEnabled()) {
729 // Count all flows we read from datastore for debugging purposes.
730 // This number do not always represent how many flows were actually added
731 // to DeviceFlowRegistry, because of possible duplicates.
732 long flowCount = Optional.ofNullable(result)
733 .map(Collections::singleton)
734 .orElse(Collections.emptySet())
736 .flatMap(Collection::stream)
737 .filter(Objects::nonNull)
738 .flatMap(flowCapableNodeOptional
739 -> com.google.common.base.Optional.fromJavaUtil(flowCapableNodeOptional).asSet().stream())
740 .filter(Objects::nonNull)
741 .filter(flowCapableNode -> flowCapableNode.getTable() != null)
742 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
743 .filter(Objects::nonNull)
744 .filter(table -> table.getFlow() != null)
745 .flatMap(table -> table.getFlow().stream())
746 .filter(Objects::nonNull)
749 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
754 public void onFailure(Throwable throwable) {
755 if (deviceFlowRegistryFill.isCancelled()) {
756 if (LOG.isDebugEnabled()) {
757 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
760 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
762 contextChainMastershipWatcher.onNotAbleToStartMastership(
764 "Was not able to fill flow registry on device",