2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
43 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.yang.binding.DataContainer;
102 import org.opendaylight.yangtools.yang.binding.DataObject;
103 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
104 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
108 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
110 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112 // TODO: drain factor should be parametrized
113 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
114 // TODO: low water mark factor should be parametrized
115 private static final float LOW_WATERMARK_FACTOR = 0.75f;
116 // TODO: high water mark factor should be parametrized
117 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
119 // Timeout in milliseconds after what we will give up on initializing device
120 private static final int DEVICE_INIT_TIMEOUT = 9000;
122 // Timeout in milliseconds after what we will give up on closing transaction chain
123 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
125 private static final int LOW_WATERMARK = 1000;
126 private static final int HIGH_WATERMARK = 2000;
128 private final MultipartWriterProvider writerProvider;
129 private final HashedWheelTimer hashedWheelTimer;
130 private final DeviceState deviceState;
131 private final DataBroker dataBroker;
132 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
133 private final MessageSpy messageSpy;
134 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
135 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
136 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
137 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
138 private final TranslatorLibrary translatorLibrary;
139 private final ConvertorExecutor convertorExecutor;
140 private final DeviceInitializerProvider deviceInitializerProvider;
141 private final PacketInRateLimiter packetInLimiter;
142 private final DeviceInfo deviceInfo;
143 private final ConnectionContext primaryConnectionContext;
144 private final boolean skipTableFeatures;
145 private final boolean switchFeaturesMandatory;
146 private final boolean isFlowRemovedNotificationOn;
147 private final boolean useSingleLayerSerialization;
148 private final AtomicBoolean initialized = new AtomicBoolean(false);
149 private final AtomicBoolean hasState = new AtomicBoolean(false);
150 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
151 private final ContextChainHolder contextChainHolder;
152 private NotificationPublishService notificationPublishService;
153 private TransactionChainManager transactionChainManager;
154 private DeviceFlowRegistry deviceFlowRegistry;
155 private DeviceGroupRegistry deviceGroupRegistry;
156 private DeviceMeterRegistry deviceMeterRegistry;
157 private ExtensionConverterProvider extensionConverterProvider;
158 private ContextChainMastershipWatcher contextChainMastershipWatcher;
160 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
161 @Nonnull final DataBroker dataBroker,
162 @Nonnull final MessageSpy messageSpy,
163 @Nonnull final TranslatorLibrary translatorLibrary,
164 final ConvertorExecutor convertorExecutor,
165 final boolean skipTableFeatures,
166 final HashedWheelTimer hashedWheelTimer,
167 final boolean useSingleLayerSerialization,
168 final DeviceInitializerProvider deviceInitializerProvider,
169 final boolean isFlowRemovedNotificationOn,
170 final boolean switchFeaturesMandatory,
171 final ContextChainHolder contextChainHolder) {
173 this.primaryConnectionContext = primaryConnectionContext;
174 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
175 this.hashedWheelTimer = hashedWheelTimer;
176 this.deviceInitializerProvider = deviceInitializerProvider;
177 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
178 this.switchFeaturesMandatory = switchFeaturesMandatory;
179 this.deviceState = new DeviceStateImpl();
180 this.dataBroker = dataBroker;
181 this.messageSpy = messageSpy;
182 this.contextChainHolder = contextChainHolder;
184 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
185 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
187 this.translatorLibrary = translatorLibrary;
188 this.portStatusTranslator = translatorLibrary.lookupTranslator(
189 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
190 this.packetInTranslator = translatorLibrary.lookupTranslator(
191 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
193 .PacketIn.class.getName()));
194 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
195 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
197 this.convertorExecutor = convertorExecutor;
198 this.skipTableFeatures = skipTableFeatures;
199 this.useSingleLayerSerialization = useSingleLayerSerialization;
200 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
204 public boolean initialSubmitTransaction() {
205 if (!initialized.get()) {
209 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
210 isInitialTransactionSubmitted.set(initialSubmit);
211 return initialSubmit;
215 public DeviceState getDeviceState() {
220 public ReadTransaction getReadTransaction() {
221 return dataBroker.newReadOnlyTransaction();
225 public boolean isTransactionsEnabled() {
226 return isInitialTransactionSubmitted.get();
230 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
231 final InstanceIdentifier<T> path,
233 if (initialized.get()) {
234 transactionChainManager.writeToTransaction(store, path, data, false);
239 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
240 final InstanceIdentifier<T> path,
242 if (initialized.get()) {
243 transactionChainManager.writeToTransaction(store, path, data, true);
248 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
249 final InstanceIdentifier<T> path) {
250 if (initialized.get()) {
251 transactionChainManager.addDeleteOperationToTxChain(store, path);
256 public boolean submitTransaction() {
257 return initialized.get() && transactionChainManager.submitTransaction();
261 public boolean syncSubmitTransaction() {
262 return initialized.get() && transactionChainManager.submitTransaction(true);
266 public ConnectionContext getPrimaryConnectionContext() {
267 return primaryConnectionContext;
271 public DeviceFlowRegistry getDeviceFlowRegistry() {
272 return deviceFlowRegistry;
276 public DeviceGroupRegistry getDeviceGroupRegistry() {
277 return deviceGroupRegistry;
281 public DeviceMeterRegistry getDeviceMeterRegistry() {
282 return deviceMeterRegistry;
286 public void processReply(final OfHeader ofHeader) {
287 messageSpy.spyMessage(
288 ofHeader.getImplementedInterface(),
289 ofHeader instanceof Error
290 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
291 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
295 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
296 ofHeaderList.forEach(header -> messageSpy.spyMessage(
297 header.getImplementedInterface(),
298 header instanceof Error
299 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
300 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
304 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
305 if (isMasterOfDevice()) {
306 //1. translate to general flow (table, priority, match, cookie)
307 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
308 .FlowRemoved flowRemovedNotification = flowRemovedTranslator
309 .translate(flowRemoved, deviceInfo, null);
311 if (isFlowRemovedNotificationOn) {
312 // Trigger off a notification
313 notificationPublishService.offerNotification(flowRemovedNotification);
316 LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
317 deviceInfo.getLOGValue());
322 @SuppressWarnings("checkstyle:IllegalCatch")
323 public void processPortStatusMessage(final PortStatusMessage portStatus) {
324 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
325 .FROM_SWITCH_PUBLISHED_SUCCESS);
327 if (initialized.get()) {
329 writePortStatusMessage(portStatus);
330 } catch (final Exception e) {
331 LOG.warn("Error processing port status message for port {} on device {}",
332 portStatus.getPortNo(), getDeviceInfo(), e);
334 } else if (!hasState.get()) {
335 primaryConnectionContext.handlePortStatusMessage(portStatus);
339 private void writePortStatusMessage(final PortStatus portStatusMessage) {
340 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
341 .translate(portStatusMessage, getDeviceInfo(), null);
343 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
344 .getNodeInstanceIdentifier()
345 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
346 .nodeConnectorIdfromDatapathPortNo(
347 deviceInfo.getDatapathId(),
348 portStatusMessage.getPortNo(),
349 OpenflowVersion.get(deviceInfo.getVersion()))));
351 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
352 .withKey(iiToNodeConnector.getKey())
353 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
354 FlowCapableNodeConnectorStatisticsDataBuilder().build())
355 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
357 syncSubmitTransaction();
358 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
359 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
360 syncSubmitTransaction();
365 public void processPacketInMessage(final PacketInMessage packetInMessage) {
366 if (isMasterOfDevice()) {
367 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
368 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
370 LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
374 private Boolean isMasterOfDevice() {
375 final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
376 boolean result = false;
377 if (contextChain != null) {
378 result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
383 private void handlePacketInMessage(final PacketIn packetIn,
384 final Class<?> implementedInterface,
386 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
387 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
389 if (packetIn == null) {
390 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
391 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
395 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
397 // Try to get ingress from match
398 final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
399 ? packetIn.getIngress() : Optional.ofNullable(match)
400 .map(Match::getInPort)
401 .map(nodeConnectorId -> InventoryDataServiceUtil
402 .portNumberfromNodeConnectorId(
405 .map(portNumber -> InventoryDataServiceUtil
406 .nodeConnectorRefFromDatapathIdPortno(
407 deviceInfo.getDatapathId(),
412 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
414 if (!packetInLimiter.acquirePermit()) {
415 LOG.debug("Packet limited");
416 // TODO: save packet into emergency slot if possible
417 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
418 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
422 final ListenableFuture<?> offerNotification = notificationPublishService
423 .offerNotification(new PacketReceivedBuilder(packetIn)
424 .setIngress(nodeConnectorRef)
425 .setMatch(MatchUtil.transformMatch(match,
426 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
430 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
431 LOG.debug("notification offer rejected");
432 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
433 packetInLimiter.drainLowWaterMark();
434 packetInLimiter.releasePermit();
438 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
440 public void onSuccess(final Object result) {
441 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
442 packetInLimiter.releasePermit();
446 public void onFailure(final Throwable throwable) {
447 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
448 .FROM_SWITCH_NOTIFICATION_REJECTED);
449 LOG.debug("notification offer failed: {}", throwable.getMessage());
450 LOG.trace("notification offer failed..", throwable);
451 packetInLimiter.releasePermit();
453 }, MoreExecutors.directExecutor());
457 public void processExperimenterMessage(final ExperimenterMessage notification) {
458 if (isMasterOfDevice()) {
460 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
461 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
462 getDeviceInfo().getVersion(),
463 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
464 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
465 extensionConverterProvider.getMessageConverter(key);
466 if (messageConverter == null) {
467 LOG.warn("custom converter for {}[OF:{}] not found",
468 notification.getExperimenterDataOfChoice().getImplementedInterface(),
469 getDeviceInfo().getVersion());
472 // build notification
473 final ExperimenterMessageOfChoice messageOfChoice;
474 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
475 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
476 ExperimenterMessageFromDevBuilder()
477 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
478 .setExperimenterMessageOfChoice(messageOfChoice);
480 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
482 LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
483 deviceInfo.getLOGValue());
488 // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
490 @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
491 public boolean processAlienMessage(final OfHeader message) {
492 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
494 if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
495 .equals(implementedInterface)) {
496 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
497 .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
498 .rev130709.PacketInMessage) message;
500 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
508 public TranslatorLibrary oook() {
509 return translatorLibrary;
513 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
514 this.notificationPublishService = notificationPublishService;
518 public MessageSpy getMessageSpy() {
523 public void onPublished() {
524 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
528 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
530 return new MultiMsgCollectorImpl<>(this, requestContext);
534 public void updatePacketInRateLimit(final long upperBound) {
535 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
536 (int) (HIGH_WATERMARK_FACTOR * upperBound));
540 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
541 this.extensionConverterProvider = extensionConverterProvider;
545 public ExtensionConverterProvider getExtensionConverterProvider() {
546 return extensionConverterProvider;
550 TransactionChainManager getTransactionChainManager() {
551 return this.transactionChainManager;
555 public ListenableFuture<?> closeServiceInstance() {
556 final ListenableFuture<?> listenableFuture = initialized.get()
557 ? transactionChainManager.deactivateTransactionManager()
558 : Futures.immediateFuture(null);
560 hashedWheelTimer.newTimeout((timerTask) -> {
561 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
562 listenableFuture.cancel(true);
564 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
566 return listenableFuture;
570 public DeviceInfo getDeviceInfo() {
571 return this.deviceInfo;
575 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
576 this.contextChainMastershipWatcher = newWatcher;
581 public ServiceGroupIdentifier getIdentifier() {
582 return deviceInfo.getServiceIdentifier();
586 public void close() {
587 // Close all datastore registries and transactions
588 if (initialized.getAndSet(false)) {
589 deviceGroupRegistry.close();
590 deviceFlowRegistry.close();
591 deviceMeterRegistry.close();
593 final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
595 Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
597 public void onSuccess(final Object result) {
598 transactionChainManager.close();
599 transactionChainManager = null;
603 public void onFailure(final Throwable throwable) {
604 transactionChainManager.close();
605 transactionChainManager = null;
607 }, MoreExecutors.directExecutor());
610 requestContexts.forEach(requestContext -> RequestContextUtil
611 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
612 requestContexts.clear();
616 public boolean canUseSingleLayerSerialization() {
617 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
620 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
622 @SuppressWarnings({"checkstyle:IllegalCatch"})
623 public void instantiateServiceInstance() {
624 lazyTransactionManagerInitialization();
627 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
628 .retrieveAndClearPortStatusMessages();
630 portStatusMessages.forEach(this::writePortStatusMessage);
632 } catch (final Exception ex) {
633 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
634 deviceInfo.toString(), ex.toString()), ex);
637 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
638 .lookup(deviceInfo.getVersion());
640 if (initializer.isPresent()) {
641 final Future<Void> initialize = initializer
643 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
646 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
647 } catch (TimeoutException ex) {
648 initialize.cancel(true);
649 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
650 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
651 } catch (ExecutionException | InterruptedException ex) {
652 throw new RuntimeException(
653 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
656 throw new RuntimeException(String.format("Unsupported version %s for device %s",
657 deviceInfo.getVersion(),
658 deviceInfo.toString()));
661 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
662 getDeviceFlowRegistry().fill();
663 Futures.addCallback(deviceFlowRegistryFill,
664 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
665 MoreExecutors.directExecutor());
669 void lazyTransactionManagerInitialization() {
670 if (!this.initialized.get()) {
671 if (LOG.isDebugEnabled()) {
672 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
674 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
675 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
676 deviceInfo.getNodeInstanceIdentifier());
677 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
678 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
681 transactionChainManager.activateTransactionManager();
682 initialized.set(true);
687 public <T> RequestContext<T> createRequestContext() {
688 final Long xid = deviceInfo.reserveXidForDeviceMessage();
690 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
692 public void close() {
693 requestContexts.remove(this);
697 requestContexts.add(abstractRequestContext);
698 return abstractRequestContext;
702 public void onStateAcquired(final ContextChainState state) {
706 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
707 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
708 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
710 DeviceFlowRegistryCallback(
711 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
712 ContextChainMastershipWatcher contextChainMastershipWatcher) {
713 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
714 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
718 public void onSuccess(List<Optional<FlowCapableNode>> result) {
719 if (LOG.isDebugEnabled()) {
720 // Count all flows we read from datastore for debugging purposes.
721 // This number do not always represent how many flows were actually added
722 // to DeviceFlowRegistry, because of possible duplicates.
723 long flowCount = Optional.ofNullable(result)
724 .map(Collections::singleton)
725 .orElse(Collections.emptySet())
727 .flatMap(Collection::stream)
728 .filter(Objects::nonNull)
729 .flatMap(flowCapableNodeOptional
730 -> com.google.common.base.Optional.fromJavaUtil(flowCapableNodeOptional).asSet().stream())
731 .filter(Objects::nonNull)
732 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
733 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
734 .filter(Objects::nonNull)
735 .filter(table -> Objects.nonNull(table.getFlow()))
736 .flatMap(table -> table.getFlow().stream())
737 .filter(Objects::nonNull)
740 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
742 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
743 .INITIAL_FLOW_REGISTRY_FILL);
747 public void onFailure(Throwable throwable) {
748 if (deviceFlowRegistryFill.isCancelled()) {
749 if (LOG.isDebugEnabled()) {
750 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
753 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
755 contextChainMastershipWatcher.onNotAbleToStartMastership(
757 "Was not able to fill flow registry on device",