2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
43 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
63 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
65 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
66 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
69 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
72 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
73 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
74 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
76 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
111 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
113 // TODO: drain factor should be parametrized
114 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
115 // TODO: low water mark factor should be parametrized
116 private static final float LOW_WATERMARK_FACTOR = 0.75f;
117 // TODO: high water mark factor should be parametrized
118 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
120 // Timeout in milliseconds after what we will give up on initializing device
121 private static final int DEVICE_INIT_TIMEOUT = 9000;
123 // Timeout in milliseconds after what we will give up on closing transaction chain
124 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
126 private static final int LOW_WATERMARK = 1000;
127 private static final int HIGH_WATERMARK = 2000;
129 private final MultipartWriterProvider writerProvider;
130 private final HashedWheelTimer hashedWheelTimer;
131 private final DeviceState deviceState;
132 private final DataBroker dataBroker;
133 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
134 private final MessageSpy messageSpy;
135 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
136 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
137 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
138 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
139 private final TranslatorLibrary translatorLibrary;
140 private final ConvertorExecutor convertorExecutor;
141 private final DeviceInitializerProvider deviceInitializerProvider;
142 private final PacketInRateLimiter packetInLimiter;
143 private final DeviceInfo deviceInfo;
144 private final ConnectionContext primaryConnectionContext;
145 private final boolean skipTableFeatures;
146 private final boolean switchFeaturesMandatory;
147 private final boolean isFlowRemovedNotificationOn;
148 private final boolean useSingleLayerSerialization;
149 private final AtomicBoolean initialized = new AtomicBoolean(false);
150 private final AtomicBoolean hasState = new AtomicBoolean(false);
151 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
152 private final ContextChainHolder contextChainHolder;
153 private NotificationPublishService notificationPublishService;
154 private TransactionChainManager transactionChainManager;
155 private DeviceFlowRegistry deviceFlowRegistry;
156 private DeviceGroupRegistry deviceGroupRegistry;
157 private DeviceMeterRegistry deviceMeterRegistry;
158 private ExtensionConverterProvider extensionConverterProvider;
159 private ContextChainMastershipWatcher contextChainMastershipWatcher;
161 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
162 @Nonnull final DataBroker dataBroker,
163 @Nonnull final MessageSpy messageSpy,
164 @Nonnull final TranslatorLibrary translatorLibrary,
165 final ConvertorExecutor convertorExecutor,
166 final boolean skipTableFeatures,
167 final HashedWheelTimer hashedWheelTimer,
168 final boolean useSingleLayerSerialization,
169 final DeviceInitializerProvider deviceInitializerProvider,
170 final boolean isFlowRemovedNotificationOn,
171 final boolean switchFeaturesMandatory,
172 final ContextChainHolder contextChainHolder) {
174 this.primaryConnectionContext = primaryConnectionContext;
175 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
176 this.hashedWheelTimer = hashedWheelTimer;
177 this.deviceInitializerProvider = deviceInitializerProvider;
178 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
179 this.switchFeaturesMandatory = switchFeaturesMandatory;
180 this.deviceState = new DeviceStateImpl();
181 this.dataBroker = dataBroker;
182 this.messageSpy = messageSpy;
183 this.contextChainHolder = contextChainHolder;
185 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
186 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
188 this.translatorLibrary = translatorLibrary;
189 this.portStatusTranslator = translatorLibrary.lookupTranslator(
190 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
191 this.packetInTranslator = translatorLibrary.lookupTranslator(
192 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
194 .PacketIn.class.getName()));
195 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
196 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
198 this.convertorExecutor = convertorExecutor;
199 this.skipTableFeatures = skipTableFeatures;
200 this.useSingleLayerSerialization = useSingleLayerSerialization;
201 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
205 public boolean initialSubmitTransaction() {
206 if (!initialized.get()) {
210 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
211 isInitialTransactionSubmitted.set(initialSubmit);
212 return initialSubmit;
216 public DeviceState getDeviceState() {
221 public ReadTransaction getReadTransaction() {
222 return dataBroker.newReadOnlyTransaction();
226 public boolean isTransactionsEnabled() {
227 return isInitialTransactionSubmitted.get();
231 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
232 final InstanceIdentifier<T> path,
234 if (initialized.get()) {
235 transactionChainManager.writeToTransaction(store, path, data, false);
240 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
241 final InstanceIdentifier<T> path,
243 if (initialized.get()) {
244 transactionChainManager.writeToTransaction(store, path, data, true);
249 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
250 final InstanceIdentifier<T> path) {
251 if (initialized.get()) {
252 transactionChainManager.addDeleteOperationToTxChain(store, path);
257 public boolean submitTransaction() {
258 return initialized.get() && transactionChainManager.submitTransaction();
262 public boolean syncSubmitTransaction() {
263 return initialized.get() && transactionChainManager.submitTransaction(true);
267 public ConnectionContext getPrimaryConnectionContext() {
268 return primaryConnectionContext;
272 public DeviceFlowRegistry getDeviceFlowRegistry() {
273 return deviceFlowRegistry;
277 public DeviceGroupRegistry getDeviceGroupRegistry() {
278 return deviceGroupRegistry;
282 public DeviceMeterRegistry getDeviceMeterRegistry() {
283 return deviceMeterRegistry;
287 public void processReply(final OfHeader ofHeader) {
288 messageSpy.spyMessage(
289 ofHeader.getImplementedInterface(),
290 ofHeader instanceof Error
291 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
292 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
296 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
297 ofHeaderList.forEach(header -> messageSpy.spyMessage(
298 header.getImplementedInterface(),
299 header instanceof Error
300 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
301 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
305 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
306 if (isMasterOfDevice()) {
307 //1. translate to general flow (table, priority, match, cookie)
308 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
309 .FlowRemoved flowRemovedNotification = flowRemovedTranslator
310 .translate(flowRemoved, deviceInfo, null);
312 if (isFlowRemovedNotificationOn) {
313 // Trigger off a notification
314 notificationPublishService.offerNotification(flowRemovedNotification);
317 LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
318 deviceInfo.getLOGValue());
323 @SuppressWarnings("checkstyle:IllegalCatch")
324 public void processPortStatusMessage(final PortStatusMessage portStatus) {
325 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
326 .FROM_SWITCH_PUBLISHED_SUCCESS);
328 if (initialized.get()) {
330 writePortStatusMessage(portStatus);
331 } catch (final Exception e) {
332 LOG.warn("Error processing port status message for port {} on device {}",
333 portStatus.getPortNo(), getDeviceInfo(), e);
335 } else if (!hasState.get()) {
336 primaryConnectionContext.handlePortStatusMessage(portStatus);
340 private void writePortStatusMessage(final PortStatus portStatusMessage) {
341 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
342 .translate(portStatusMessage, getDeviceInfo(), null);
344 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
345 .getNodeInstanceIdentifier()
346 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
347 .nodeConnectorIdfromDatapathPortNo(
348 deviceInfo.getDatapathId(),
349 portStatusMessage.getPortNo(),
350 OpenflowVersion.get(deviceInfo.getVersion()))));
352 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
353 .withKey(iiToNodeConnector.getKey())
354 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
355 FlowCapableNodeConnectorStatisticsDataBuilder().build())
356 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
358 syncSubmitTransaction();
359 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
360 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
361 syncSubmitTransaction();
366 public void processPacketInMessage(final PacketInMessage packetInMessage) {
367 if (isMasterOfDevice()) {
368 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
369 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
371 LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
375 private Boolean isMasterOfDevice() {
376 final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
377 boolean result = false;
378 if (contextChain != null) {
379 result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
384 private void handlePacketInMessage(final PacketIn packetIn,
385 final Class<?> implementedInterface,
387 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
388 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
390 if (packetIn == null) {
391 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
392 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
396 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
398 // Try to get ingress from match
399 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
400 ? packetIn.getIngress() : Optional.ofNullable(match)
401 .map(Match::getInPort)
402 .map(nodeConnectorId -> InventoryDataServiceUtil
403 .portNumberfromNodeConnectorId(
406 .map(portNumber -> InventoryDataServiceUtil
407 .nodeConnectorRefFromDatapathIdPortno(
408 deviceInfo.getDatapathId(),
413 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
415 if (!packetInLimiter.acquirePermit()) {
416 LOG.debug("Packet limited");
417 // TODO: save packet into emergency slot if possible
418 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
419 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
423 final ListenableFuture<?> offerNotification = notificationPublishService
424 .offerNotification(new PacketReceivedBuilder(packetIn)
425 .setIngress(nodeConnectorRef)
426 .setMatch(MatchUtil.transformMatch(match,
427 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
431 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
432 LOG.debug("notification offer rejected");
433 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
434 packetInLimiter.drainLowWaterMark();
435 packetInLimiter.releasePermit();
439 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
441 public void onSuccess(final Object result) {
442 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
443 packetInLimiter.releasePermit();
447 public void onFailure(final Throwable throwable) {
448 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
449 .FROM_SWITCH_NOTIFICATION_REJECTED);
450 LOG.debug("notification offer failed: {}", throwable.getMessage());
451 LOG.trace("notification offer failed..", throwable);
452 packetInLimiter.releasePermit();
454 }, MoreExecutors.directExecutor());
458 public void processExperimenterMessage(final ExperimenterMessage notification) {
459 if (isMasterOfDevice()) {
461 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
462 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
463 getDeviceInfo().getVersion(),
464 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
465 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
466 extensionConverterProvider.getMessageConverter(key);
467 if (messageConverter == null) {
468 LOG.warn("custom converter for {}[OF:{}] not found",
469 notification.getExperimenterDataOfChoice().getImplementedInterface(),
470 getDeviceInfo().getVersion());
473 // build notification
474 final ExperimenterMessageOfChoice messageOfChoice;
476 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
477 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
478 ExperimenterMessageFromDevBuilder()
479 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
480 .setExperimenterMessageOfChoice(messageOfChoice);
482 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
483 } catch (final ConversionException e) {
484 LOG.error("Conversion of experimenter notification failed", e);
487 LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
488 deviceInfo.getLOGValue());
493 // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
495 @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
496 public boolean processAlienMessage(final OfHeader message) {
497 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
499 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
500 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
501 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
502 .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503 .rev130709.PacketInMessage) message;
505 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
513 public TranslatorLibrary oook() {
514 return translatorLibrary;
518 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
519 this.notificationPublishService = notificationPublishService;
523 public MessageSpy getMessageSpy() {
528 public void onPublished() {
529 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
533 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
535 return new MultiMsgCollectorImpl<>(this, requestContext);
539 public void updatePacketInRateLimit(final long upperBound) {
540 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
541 (int) (HIGH_WATERMARK_FACTOR * upperBound));
545 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
546 this.extensionConverterProvider = extensionConverterProvider;
550 public ExtensionConverterProvider getExtensionConverterProvider() {
551 return extensionConverterProvider;
555 TransactionChainManager getTransactionChainManager() {
556 return this.transactionChainManager;
560 public ListenableFuture<?> closeServiceInstance() {
561 final ListenableFuture<?> listenableFuture = initialized.get()
562 ? transactionChainManager.deactivateTransactionManager()
563 : Futures.immediateFuture(null);
565 hashedWheelTimer.newTimeout((timerTask) -> {
566 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
567 listenableFuture.cancel(true);
569 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
571 return listenableFuture;
575 public DeviceInfo getDeviceInfo() {
576 return this.deviceInfo;
580 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
581 this.contextChainMastershipWatcher = newWatcher;
586 public ServiceGroupIdentifier getIdentifier() {
587 return deviceInfo.getServiceIdentifier();
591 public void close() {
592 // Close all datastore registries and transactions
593 if (initialized.getAndSet(false)) {
594 deviceGroupRegistry.close();
595 deviceFlowRegistry.close();
596 deviceMeterRegistry.close();
598 final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
600 Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
602 public void onSuccess(final Object result) {
603 transactionChainManager.close();
604 transactionChainManager = null;
608 public void onFailure(final Throwable throwable) {
609 transactionChainManager.close();
610 transactionChainManager = null;
612 }, MoreExecutors.directExecutor());
615 requestContexts.forEach(requestContext -> RequestContextUtil
616 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
617 requestContexts.clear();
621 public boolean canUseSingleLayerSerialization() {
622 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
625 // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
627 @SuppressWarnings({"checkstyle:IllegalCatch"})
628 public void instantiateServiceInstance() {
629 lazyTransactionManagerInitialization();
632 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
633 .retrieveAndClearPortStatusMessages();
635 portStatusMessages.forEach(this::writePortStatusMessage);
637 } catch (final Exception ex) {
638 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
639 deviceInfo.toString(), ex.toString()), ex);
642 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
643 .lookup(deviceInfo.getVersion());
645 if (initializer.isPresent()) {
646 final Future<Void> initialize = initializer
648 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
651 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
652 } catch (TimeoutException ex) {
653 initialize.cancel(true);
654 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
655 deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
656 } catch (ExecutionException | InterruptedException ex) {
657 throw new RuntimeException(
658 String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
661 throw new RuntimeException(String.format("Unsupported version %s for device %s",
662 deviceInfo.getVersion(),
663 deviceInfo.toString()));
666 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
667 getDeviceFlowRegistry().fill();
668 Futures.addCallback(deviceFlowRegistryFill,
669 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
670 MoreExecutors.directExecutor());
674 void lazyTransactionManagerInitialization() {
675 if (!this.initialized.get()) {
676 if (LOG.isDebugEnabled()) {
677 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
679 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
680 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
681 deviceInfo.getNodeInstanceIdentifier());
682 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
683 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
686 transactionChainManager.activateTransactionManager();
687 initialized.set(true);
692 public <T> RequestContext<T> createRequestContext() {
693 final Long xid = deviceInfo.reserveXidForDeviceMessage();
695 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
697 public void close() {
698 requestContexts.remove(this);
702 requestContexts.add(abstractRequestContext);
703 return abstractRequestContext;
707 public void onStateAcquired(final ContextChainState state) {
711 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
712 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
713 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
715 DeviceFlowRegistryCallback(
716 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
717 ContextChainMastershipWatcher contextChainMastershipWatcher) {
718 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
719 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
723 public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
724 if (LOG.isDebugEnabled()) {
725 // Count all flows we read from datastore for debugging purposes.
726 // This number do not always represent how many flows were actually added
727 // to DeviceFlowRegistry, because of possible duplicates.
728 long flowCount = Optional.ofNullable(result)
729 .map(Collections::singleton)
730 .orElse(Collections.emptySet())
732 .flatMap(Collection::stream)
733 .filter(Objects::nonNull)
734 .flatMap(flowCapableNodeOptional
735 -> com.google.common.base.Optional.fromJavaUtil(flowCapableNodeOptional).asSet().stream())
736 .filter(Objects::nonNull)
737 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
738 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
739 .filter(Objects::nonNull)
740 .filter(table -> Objects.nonNull(table.getFlow()))
741 .flatMap(table -> table.getFlow().stream())
742 .filter(Objects::nonNull)
745 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
747 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
748 .INITIAL_FLOW_REGISTRY_FILL);
752 public void onFailure(Throwable throwable) {
753 if (deviceFlowRegistryFill.isCancelled()) {
754 if (LOG.isDebugEnabled()) {
755 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
758 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
760 contextChainMastershipWatcher.onNotAbleToStartMastership(
762 "Was not able to fill flow registry on device",