2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
41 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
48 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
49 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
50 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
63 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
64 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
65 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
66 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
67 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
69 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
70 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
72 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
73 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
74 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
75 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
76 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
77 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
78 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
107 import org.opendaylight.yangtools.yang.binding.DataContainer;
108 import org.opendaylight.yangtools.yang.binding.DataObject;
109 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
110 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
111 import org.slf4j.Logger;
112 import org.slf4j.LoggerFactory;
114 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
116 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
118 // TODO: drain factor should be parametrized
119 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
120 // TODO: low water mark factor should be parametrized
121 private static final float LOW_WATERMARK_FACTOR = 0.75f;
122 // TODO: high water mark factor should be parametrized
123 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
125 // Timeout in milliseconds after what we will give up on initializing device
126 private static final int DEVICE_INIT_TIMEOUT = 9000;
128 // Timeout in milliseconds after what we will give up on closing transaction chain
129 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
131 private static final int LOW_WATERMARK = 1000;
132 private static final int HIGH_WATERMARK = 2000;
134 private final MultipartWriterProvider writerProvider;
135 private final HashedWheelTimer hashedWheelTimer;
136 private final DeviceState deviceState;
137 private final DataBroker dataBroker;
138 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
139 private final MessageSpy messageSpy;
140 private final ItemLifeCycleKeeper flowLifeCycleKeeper;
141 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
142 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
143 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
144 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
145 private final TranslatorLibrary translatorLibrary;
146 private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
147 private final ConvertorExecutor convertorExecutor;
148 private final DeviceInitializerProvider deviceInitializerProvider;
149 private final PacketInRateLimiter packetInLimiter;
150 private final DeviceInfo deviceInfo;
151 private final ConnectionContext primaryConnectionContext;
152 private final boolean skipTableFeatures;
153 private final boolean switchFeaturesMandatory;
154 private final boolean isFlowRemovedNotificationOn;
155 private final boolean useSingleLayerSerialization;
156 private final AtomicBoolean initialized = new AtomicBoolean(false);
157 private final AtomicBoolean hasState = new AtomicBoolean(false);
158 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
159 private NotificationPublishService notificationPublishService;
160 private TransactionChainManager transactionChainManager;
161 private DeviceFlowRegistry deviceFlowRegistry;
162 private DeviceGroupRegistry deviceGroupRegistry;
163 private DeviceMeterRegistry deviceMeterRegistry;
164 private ExtensionConverterProvider extensionConverterProvider;
165 private ContextChainMastershipWatcher contextChainMastershipWatcher;
167 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
168 @Nonnull final DataBroker dataBroker,
169 @Nonnull final MessageSpy messageSpy,
170 @Nonnull final TranslatorLibrary translatorLibrary,
171 final ConvertorExecutor convertorExecutor,
172 final boolean skipTableFeatures,
173 final HashedWheelTimer hashedWheelTimer,
174 final boolean useSingleLayerSerialization,
175 final DeviceInitializerProvider deviceInitializerProvider,
176 final boolean isFlowRemovedNotificationOn,
177 final boolean switchFeaturesMandatory) {
179 this.primaryConnectionContext = primaryConnectionContext;
180 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
181 this.hashedWheelTimer = hashedWheelTimer;
182 this.deviceInitializerProvider = deviceInitializerProvider;
183 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
184 this.switchFeaturesMandatory = switchFeaturesMandatory;
185 this.deviceState = new DeviceStateImpl();
186 this.dataBroker = dataBroker;
187 this.messageSpy = messageSpy;
189 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
190 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
192 this.translatorLibrary = translatorLibrary;
193 this.portStatusTranslator = translatorLibrary.lookupTranslator(
194 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
195 this.packetInTranslator = translatorLibrary.lookupTranslator(
196 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
198 .PacketIn.class.getName()));
199 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
200 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
202 this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
203 this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
204 this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
205 this.convertorExecutor = convertorExecutor;
206 this.skipTableFeatures = skipTableFeatures;
207 this.useSingleLayerSerialization = useSingleLayerSerialization;
208 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
212 public boolean initialSubmitTransaction() {
213 if (!initialized.get()) {
217 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
218 isInitialTransactionSubmitted.set(initialSubmit);
219 return initialSubmit;
223 public DeviceState getDeviceState() {
228 public ReadOnlyTransaction getReadTransaction() {
229 return dataBroker.newReadOnlyTransaction();
233 public boolean isTransactionsEnabled() {
234 return isInitialTransactionSubmitted.get();
238 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
239 final InstanceIdentifier<T> path,
241 if (initialized.get()) {
242 transactionChainManager.writeToTransaction(store, path, data, false);
247 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
248 final InstanceIdentifier<T> path,
250 if (initialized.get()) {
251 transactionChainManager.writeToTransaction(store, path, data, true);
256 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
257 final InstanceIdentifier<T> path) {
258 if (initialized.get()) {
259 transactionChainManager.addDeleteOperationTotTxChain(store, path);
264 public boolean submitTransaction() {
265 return initialized.get() && transactionChainManager.submitWriteTransaction();
269 public ConnectionContext getPrimaryConnectionContext() {
270 return primaryConnectionContext;
274 public DeviceFlowRegistry getDeviceFlowRegistry() {
275 return deviceFlowRegistry;
279 public DeviceGroupRegistry getDeviceGroupRegistry() {
280 return deviceGroupRegistry;
284 public DeviceMeterRegistry getDeviceMeterRegistry() {
285 return deviceMeterRegistry;
289 public void processReply(final OfHeader ofHeader) {
290 messageSpy.spyMessage(
291 ofHeader.getImplementedInterface(),
292 (ofHeader instanceof Error)
293 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
294 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
298 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
299 ofHeaderList.forEach(header -> messageSpy.spyMessage(
300 header.getImplementedInterface(),
301 (header instanceof Error)
302 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
307 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
308 //1. translate to general flow (table, priority, match, cookie)
309 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
310 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
312 if (isFlowRemovedNotificationOn) {
313 // Trigger off a notification
314 notificationPublishService.offerNotification(flowRemovedNotification);
317 final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
318 if (itemLifecycleListener != null) {
319 //2. create registry key
320 final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(),
321 flowRemovedNotification);
323 final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
324 //4. if flowId present:
325 if (flowDescriptor != null) {
326 // a) construct flow path
327 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
328 .augmentation(FlowCapableNode.class)
329 .child(Table.class, flowDescriptor.getTableKey())
330 .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
331 // b) notify listener
332 itemLifecycleListener.onRemoved(flowPath);
334 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
335 getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
341 @SuppressWarnings("checkstyle:IllegalCatch")
342 public void processPortStatusMessage(final PortStatusMessage portStatus) {
343 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
344 .FROM_SWITCH_PUBLISHED_SUCCESS);
346 if (initialized.get()) {
348 writePortStatusMessage(portStatus);
350 } catch (final Exception e) {
351 LOG.warn("Error processing port status message for port {} on device {}",
352 portStatus.getPortNo(), getDeviceInfo(), e);
354 } else if (!hasState.get()) {
355 primaryConnectionContext.handlePortStatusMessage(portStatus);
359 private void writePortStatusMessage(final PortStatus portStatusMessage) {
360 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
361 .translate(portStatusMessage, getDeviceInfo(), null);
363 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
364 .getNodeInstanceIdentifier()
365 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
366 .nodeConnectorIdfromDatapathPortNo(
367 deviceInfo.getDatapathId(),
368 portStatusMessage.getPortNo(),
369 OpenflowVersion.get(deviceInfo.getVersion()))));
371 if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
372 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
373 // because of ADD status node connector has to be created
374 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
375 .setKey(iiToNodeConnector.getKey())
376 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
377 FlowCapableNodeConnectorStatisticsDataBuilder().build())
378 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
380 } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
381 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
386 public void processPacketInMessage(final PacketInMessage packetInMessage) {
387 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
388 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
391 private void handlePacketInMessage(final PacketIn packetIn,
392 final Class<?> implementedInterface,
394 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
395 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
397 if (packetIn == null) {
398 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
399 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
403 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
405 // Try to get ingress from match
406 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
407 ? packetIn.getIngress() : Optional.ofNullable(match)
408 .map(Match::getInPort)
409 .map(nodeConnectorId -> InventoryDataServiceUtil
410 .portNumberfromNodeConnectorId(
413 .map(portNumber -> InventoryDataServiceUtil
414 .nodeConnectorRefFromDatapathIdPortno(
415 deviceInfo.getDatapathId(),
420 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
422 if (!packetInLimiter.acquirePermit()) {
423 LOG.debug("Packet limited");
424 // TODO: save packet into emergency slot if possible
425 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
426 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
430 final ListenableFuture<?> offerNotification = notificationPublishService
431 .offerNotification(new PacketReceivedBuilder(packetIn)
432 .setIngress(nodeConnectorRef)
433 .setMatch(MatchUtil.transformMatch(match,
434 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
438 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
439 LOG.debug("notification offer rejected");
440 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
441 packetInLimiter.drainLowWaterMark();
442 packetInLimiter.releasePermit();
446 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
448 public void onSuccess(final Object result) {
449 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
450 packetInLimiter.releasePermit();
454 public void onFailure(final Throwable throwable) {
455 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
456 .FROM_SWITCH_NOTIFICATION_REJECTED);
457 LOG.debug("notification offer failed: {}", throwable.getMessage());
458 LOG.trace("notification offer failed..", throwable);
459 packetInLimiter.releasePermit();
465 public void processExperimenterMessage(final ExperimenterMessage notification) {
467 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
468 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
469 getDeviceInfo().getVersion(),
470 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
471 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
472 extensionConverterProvider.getMessageConverter(key);
473 if (messageConverter == null) {
474 LOG.warn("custom converter for {}[OF:{}] not found",
475 notification.getExperimenterDataOfChoice().getImplementedInterface(),
476 getDeviceInfo().getVersion());
479 // build notification
480 final ExperimenterMessageOfChoice messageOfChoice;
482 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
483 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
484 ExperimenterMessageFromDevBuilder()
485 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
486 .setExperimenterMessageOfChoice(messageOfChoice);
488 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
489 } catch (final ConversionException e) {
490 LOG.error("Conversion of experimenter notification failed", e);
495 public boolean processAlienMessage(final OfHeader message) {
496 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
498 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
499 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
500 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
501 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
502 .rev130709.PacketInMessage.class.cast(message);
504 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
512 public TranslatorLibrary oook() {
513 return translatorLibrary;
517 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
518 this.notificationPublishService = notificationPublishService;
522 public MessageSpy getMessageSpy() {
527 public void onPublished() {
528 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
532 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
534 return new MultiMsgCollectorImpl<>(this, requestContext);
538 public void updatePacketInRateLimit(final long upperBound) {
539 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
540 (int) (HIGH_WATERMARK_FACTOR * upperBound));
544 public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
545 return itemLifeCycleSourceRegistry;
549 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
550 this.extensionConverterProvider = extensionConverterProvider;
554 public ExtensionConverterProvider getExtensionConverterProvider() {
555 return extensionConverterProvider;
559 TransactionChainManager getTransactionChainManager() {
560 return this.transactionChainManager;
564 public ListenableFuture<Void> closeServiceInstance() {
565 final ListenableFuture<Void> listenableFuture = initialized.get()
566 ? transactionChainManager.deactivateTransactionManager()
567 : Futures.immediateFuture(null);
569 hashedWheelTimer.newTimeout((timerTask) -> {
570 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
571 listenableFuture.cancel(true);
573 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
575 return listenableFuture;
579 public DeviceInfo getDeviceInfo() {
580 return this.deviceInfo;
584 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
585 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
590 public ServiceGroupIdentifier getIdentifier() {
591 return deviceInfo.getServiceIdentifier();
595 public void close() {
596 // Close all datastore registries and transactions
597 if (initialized.getAndSet(false)) {
598 deviceGroupRegistry.close();
599 deviceFlowRegistry.close();
600 deviceMeterRegistry.close();
602 final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
604 Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
606 public void onSuccess(@Nullable final Void result) {
607 transactionChainManager.close();
608 transactionChainManager = null;
612 public void onFailure(final Throwable throwable) {
613 transactionChainManager.close();
614 transactionChainManager = null;
619 requestContexts.forEach(requestContext -> RequestContextUtil
620 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
621 requestContexts.clear();
625 public boolean canUseSingleLayerSerialization() {
626 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
630 @SuppressWarnings("checkstyle:IllegalCatch")
631 public void instantiateServiceInstance() {
632 lazyTransactionManagerInitialization();
635 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
636 .retrieveAndClearPortStatusMessages();
638 portStatusMessages.forEach(this::writePortStatusMessage);
640 } catch (final Exception ex) {
641 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
642 deviceInfo.toString(),
646 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
647 .lookup(deviceInfo.getVersion());
649 if (initializer.isPresent()) {
650 final Future<Void> initialize = initializer
652 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
655 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
656 } catch (TimeoutException ex) {
657 initialize.cancel(true);
658 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
659 deviceInfo.toString(),
660 String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
662 } catch (ExecutionException | InterruptedException ex) {
663 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
664 deviceInfo.toString(),
668 throw new RuntimeException(String.format("Unsupported version %s for device %s",
669 deviceInfo.getVersion(),
670 deviceInfo.toString()));
673 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
674 getDeviceFlowRegistry().fill();
675 Futures.addCallback(deviceFlowRegistryFill,
676 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
680 void lazyTransactionManagerInitialization() {
681 if (!this.initialized.get()) {
682 if (LOG.isDebugEnabled()) {
683 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
685 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
686 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo
687 .getNodeInstanceIdentifier());
688 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
689 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
692 transactionChainManager.activateTransactionManager();
693 initialized.set(true);
698 public <T> RequestContext<T> createRequestContext() {
699 final Long xid = deviceInfo.reserveXidForDeviceMessage();
701 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
703 public void close() {
704 requestContexts.remove(this);
708 requestContexts.add(abstractRequestContext);
709 return abstractRequestContext;
713 public void onStateAcquired(final ContextChainState state) {
717 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
718 .Optional<FlowCapableNode>>> {
719 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
720 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
722 DeviceFlowRegistryCallback(
723 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
724 ContextChainMastershipWatcher contextChainMastershipWatcher) {
725 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
726 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
730 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
731 if (LOG.isDebugEnabled()) {
732 // Count all flows we read from datastore for debugging purposes.
733 // This number do not always represent how many flows were actually added
734 // to DeviceFlowRegistry, because of possible duplicates.
735 long flowCount = Optional.ofNullable(result)
736 .map(Collections::singleton)
737 .orElse(Collections.emptySet())
739 .flatMap(Collection::stream)
740 .filter(Objects::nonNull)
741 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
742 .filter(Objects::nonNull)
743 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
744 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
745 .filter(Objects::nonNull)
746 .filter(table -> Objects.nonNull(table.getFlow()))
747 .flatMap(table -> table.getFlow().stream())
748 .filter(Objects::nonNull)
751 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo
754 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
755 .INITIAL_FLOW_REGISTRY_FILL);
759 public void onFailure(Throwable throwable) {
760 if (deviceFlowRegistryFill.isCancelled()) {
761 if (LOG.isDebugEnabled()) {
762 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
765 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo
768 contextChainMastershipWatcher.onNotAbleToStartMastership(
770 "Was not able to fill flow registry on device",