2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
41 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
48 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
49 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
50 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
59 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
60 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
62 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
63 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
64 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
65 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
66 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
67 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
68 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
69 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
70 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
71 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
72 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
73 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
74 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
75 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
76 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
77 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
78 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
79 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
108 import org.opendaylight.yangtools.yang.binding.DataContainer;
109 import org.opendaylight.yangtools.yang.binding.DataObject;
110 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
111 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
112 import org.slf4j.Logger;
113 import org.slf4j.LoggerFactory;
115 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
117 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
119 // TODO: drain factor should be parametrized
120 private static final float REJECTED_DRAIN_FACTOR = 0.25f;
121 // TODO: low water mark factor should be parametrized
122 private static final float LOW_WATERMARK_FACTOR = 0.75f;
123 // TODO: high water mark factor should be parametrized
124 private static final float HIGH_WATERMARK_FACTOR = 0.95f;
126 // Timeout in milliseconds after what we will give up on initializing device
127 private static final int DEVICE_INIT_TIMEOUT = 9000;
129 // Timeout in milliseconds after what we will give up on closing transaction chain
130 private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
132 private static final int LOW_WATERMARK = 1000;
133 private static final int HIGH_WATERMARK = 2000;
135 private final MultipartWriterProvider writerProvider;
136 private final HashedWheelTimer hashedWheelTimer;
137 private final DeviceState deviceState;
138 private final DataBroker dataBroker;
139 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
140 private final MessageSpy messageSpy;
141 private final ItemLifeCycleKeeper flowLifeCycleKeeper;
142 private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
143 private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
144 private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
145 .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
146 private final TranslatorLibrary translatorLibrary;
147 private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
148 private final ConvertorExecutor convertorExecutor;
149 private final DeviceInitializerProvider deviceInitializerProvider;
150 private final PacketInRateLimiter packetInLimiter;
151 private final DeviceInfo deviceInfo;
152 private final ConnectionContext primaryConnectionContext;
153 private final boolean skipTableFeatures;
154 private final boolean switchFeaturesMandatory;
155 private final boolean isFlowRemovedNotificationOn;
156 private final boolean useSingleLayerSerialization;
157 private final AtomicBoolean initialized = new AtomicBoolean(false);
158 private final AtomicBoolean hasState = new AtomicBoolean(false);
159 private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
160 private NotificationPublishService notificationPublishService;
161 private TransactionChainManager transactionChainManager;
162 private DeviceFlowRegistry deviceFlowRegistry;
163 private DeviceGroupRegistry deviceGroupRegistry;
164 private DeviceMeterRegistry deviceMeterRegistry;
165 private ExtensionConverterProvider extensionConverterProvider;
166 private ContextChainMastershipWatcher contextChainMastershipWatcher;
168 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
169 @Nonnull final DataBroker dataBroker,
170 @Nonnull final MessageSpy messageSpy,
171 @Nonnull final TranslatorLibrary translatorLibrary,
172 final ConvertorExecutor convertorExecutor,
173 final boolean skipTableFeatures,
174 final HashedWheelTimer hashedWheelTimer,
175 final boolean useSingleLayerSerialization,
176 final DeviceInitializerProvider deviceInitializerProvider,
177 final boolean isFlowRemovedNotificationOn,
178 final boolean switchFeaturesMandatory) {
180 this.primaryConnectionContext = primaryConnectionContext;
181 this.deviceInfo = primaryConnectionContext.getDeviceInfo();
182 this.hashedWheelTimer = hashedWheelTimer;
183 this.deviceInitializerProvider = deviceInitializerProvider;
184 this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
185 this.switchFeaturesMandatory = switchFeaturesMandatory;
186 this.deviceState = new DeviceStateImpl();
187 this.dataBroker = dataBroker;
188 this.messageSpy = messageSpy;
190 this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
191 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
193 this.translatorLibrary = translatorLibrary;
194 this.portStatusTranslator = translatorLibrary.lookupTranslator(
195 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
196 this.packetInTranslator = translatorLibrary.lookupTranslator(
197 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
199 .PacketIn.class.getName()));
200 this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
201 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
203 this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
204 this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
205 this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
206 this.convertorExecutor = convertorExecutor;
207 this.skipTableFeatures = skipTableFeatures;
208 this.useSingleLayerSerialization = useSingleLayerSerialization;
209 writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
213 public boolean initialSubmitTransaction() {
214 if (!initialized.get()) {
218 final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
219 isInitialTransactionSubmitted.set(initialSubmit);
220 return initialSubmit;
224 public DeviceState getDeviceState() {
229 public ReadOnlyTransaction getReadTransaction() {
230 return dataBroker.newReadOnlyTransaction();
234 public boolean isTransactionsEnabled() {
235 return isInitialTransactionSubmitted.get();
239 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
240 final InstanceIdentifier<T> path,
242 if (initialized.get()) {
243 transactionChainManager.writeToTransaction(store, path, data, false);
248 public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
249 final InstanceIdentifier<T> path,
251 if (initialized.get()) {
252 transactionChainManager.writeToTransaction(store, path, data, true);
257 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
258 final InstanceIdentifier<T> path) {
259 if (initialized.get()) {
260 transactionChainManager.addDeleteOperationTotTxChain(store, path);
265 public boolean submitTransaction() {
266 return initialized.get() && transactionChainManager.submitTransaction();
270 public ConnectionContext getPrimaryConnectionContext() {
271 return primaryConnectionContext;
275 public DeviceFlowRegistry getDeviceFlowRegistry() {
276 return deviceFlowRegistry;
280 public DeviceGroupRegistry getDeviceGroupRegistry() {
281 return deviceGroupRegistry;
285 public DeviceMeterRegistry getDeviceMeterRegistry() {
286 return deviceMeterRegistry;
290 public void processReply(final OfHeader ofHeader) {
291 messageSpy.spyMessage(
292 ofHeader.getImplementedInterface(),
293 (ofHeader instanceof Error)
294 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
295 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
299 public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
300 ofHeaderList.forEach(header -> messageSpy.spyMessage(
301 header.getImplementedInterface(),
302 (header instanceof Error)
303 ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
304 : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
308 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
309 //1. translate to general flow (table, priority, match, cookie)
310 final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
311 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
313 if (isFlowRemovedNotificationOn) {
314 // Trigger off a notification
315 notificationPublishService.offerNotification(flowRemovedNotification);
318 final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
319 if (itemLifecycleListener != null) {
320 //2. create registry key
321 final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(),
322 flowRemovedNotification);
324 final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
325 //4. if flowId present:
326 if (flowDescriptor != null) {
327 // a) construct flow path
328 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
329 .augmentation(FlowCapableNode.class)
330 .child(Table.class, flowDescriptor.getTableKey())
331 .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
332 // b) notify listener
333 itemLifecycleListener.onRemoved(flowPath);
335 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
336 getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
342 @SuppressWarnings("checkstyle:IllegalCatch")
343 public void processPortStatusMessage(final PortStatusMessage portStatus) {
344 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
345 .FROM_SWITCH_PUBLISHED_SUCCESS);
347 if (initialized.get()) {
349 writePortStatusMessage(portStatus);
351 } catch (final Exception e) {
352 LOG.warn("Error processing port status message for port {} on device {}",
353 portStatus.getPortNo(), getDeviceInfo(), e);
355 } else if (!hasState.get()) {
356 primaryConnectionContext.handlePortStatusMessage(portStatus);
360 private void writePortStatusMessage(final PortStatus portStatusMessage) {
361 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
362 .translate(portStatusMessage, getDeviceInfo(), null);
364 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
365 .getNodeInstanceIdentifier()
366 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
367 .nodeConnectorIdfromDatapathPortNo(
368 deviceInfo.getDatapathId(),
369 portStatusMessage.getPortNo(),
370 OpenflowVersion.get(deviceInfo.getVersion()))));
372 if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
373 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
374 // because of ADD status node connector has to be created
375 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
376 .setKey(iiToNodeConnector.getKey())
377 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
378 FlowCapableNodeConnectorStatisticsDataBuilder().build())
379 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
381 } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
382 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
387 public void processPacketInMessage(final PacketInMessage packetInMessage) {
388 final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
389 handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
392 private void handlePacketInMessage(final PacketIn packetIn,
393 final Class<?> implementedInterface,
395 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
396 final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
398 if (packetIn == null) {
399 LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
400 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
404 final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
406 // Try to get ingress from match
407 final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
408 ? packetIn.getIngress() : Optional.ofNullable(match)
409 .map(Match::getInPort)
410 .map(nodeConnectorId -> InventoryDataServiceUtil
411 .portNumberfromNodeConnectorId(
414 .map(portNumber -> InventoryDataServiceUtil
415 .nodeConnectorRefFromDatapathIdPortno(
416 deviceInfo.getDatapathId(),
421 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
423 if (!packetInLimiter.acquirePermit()) {
424 LOG.debug("Packet limited");
425 // TODO: save packet into emergency slot if possible
426 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
427 .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
431 final ListenableFuture<?> offerNotification = notificationPublishService
432 .offerNotification(new PacketReceivedBuilder(packetIn)
433 .setIngress(nodeConnectorRef)
434 .setMatch(MatchUtil.transformMatch(match,
435 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
439 if (NotificationPublishService.REJECTED.equals(offerNotification)) {
440 LOG.debug("notification offer rejected");
441 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
442 packetInLimiter.drainLowWaterMark();
443 packetInLimiter.releasePermit();
447 Futures.addCallback(offerNotification, new FutureCallback<Object>() {
449 public void onSuccess(final Object result) {
450 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
451 packetInLimiter.releasePermit();
455 public void onFailure(final Throwable throwable) {
456 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
457 .FROM_SWITCH_NOTIFICATION_REJECTED);
458 LOG.debug("notification offer failed: {}", throwable.getMessage());
459 LOG.trace("notification offer failed..", throwable);
460 packetInLimiter.releasePermit();
466 public void processExperimenterMessage(final ExperimenterMessage notification) {
468 final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
469 final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
470 getDeviceInfo().getVersion(),
471 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
472 final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
473 extensionConverterProvider.getMessageConverter(key);
474 if (messageConverter == null) {
475 LOG.warn("custom converter for {}[OF:{}] not found",
476 notification.getExperimenterDataOfChoice().getImplementedInterface(),
477 getDeviceInfo().getVersion());
480 // build notification
481 final ExperimenterMessageOfChoice messageOfChoice;
483 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
484 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
485 ExperimenterMessageFromDevBuilder()
486 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
487 .setExperimenterMessageOfChoice(messageOfChoice);
489 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
490 } catch (final ConversionException e) {
491 LOG.error("Conversion of experimenter notification failed", e);
496 public boolean processAlienMessage(final OfHeader message) {
497 final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
499 if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
500 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
501 final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
502 .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503 .rev130709.PacketInMessage.class.cast(message);
505 handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
513 public TranslatorLibrary oook() {
514 return translatorLibrary;
518 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
519 this.notificationPublishService = notificationPublishService;
523 public MessageSpy getMessageSpy() {
528 public void onPublished() {
529 primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
533 public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
535 return new MultiMsgCollectorImpl<>(this, requestContext);
539 public void updatePacketInRateLimit(final long upperBound) {
540 packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
541 (int) (HIGH_WATERMARK_FACTOR * upperBound));
545 public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
546 return itemLifeCycleSourceRegistry;
550 public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
551 this.extensionConverterProvider = extensionConverterProvider;
555 public ExtensionConverterProvider getExtensionConverterProvider() {
556 return extensionConverterProvider;
560 TransactionChainManager getTransactionChainManager() {
561 return this.transactionChainManager;
565 public ListenableFuture<Void> closeServiceInstance() {
566 final ListenableFuture<Void> listenableFuture = initialized.get()
567 ? transactionChainManager.deactivateTransactionManager()
568 : Futures.immediateFuture(null);
570 hashedWheelTimer.newTimeout((timerTask) -> {
571 if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
572 listenableFuture.cancel(true);
574 }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
576 return listenableFuture;
580 public DeviceInfo getDeviceInfo() {
581 return this.deviceInfo;
585 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
586 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
591 public ServiceGroupIdentifier getIdentifier() {
592 return deviceInfo.getServiceIdentifier();
596 public void close() {
597 // Close all datastore registries and transactions
598 if (initialized.getAndSet(false)) {
599 deviceGroupRegistry.close();
600 deviceFlowRegistry.close();
601 deviceMeterRegistry.close();
603 final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
605 Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
607 public void onSuccess(@Nullable final Void result) {
608 transactionChainManager.close();
609 transactionChainManager = null;
613 public void onFailure(final Throwable throwable) {
614 transactionChainManager.close();
615 transactionChainManager = null;
620 requestContexts.forEach(requestContext -> RequestContextUtil
621 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
622 requestContexts.clear();
626 public boolean canUseSingleLayerSerialization() {
627 return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
631 @SuppressWarnings("checkstyle:IllegalCatch")
632 public void instantiateServiceInstance() {
633 lazyTransactionManagerInitialization();
636 final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
637 .retrieveAndClearPortStatusMessages();
639 portStatusMessages.forEach(this::writePortStatusMessage);
641 } catch (final Exception ex) {
642 throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
643 deviceInfo.toString(),
647 final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
648 .lookup(deviceInfo.getVersion());
650 if (initializer.isPresent()) {
651 final Future<Void> initialize = initializer
653 .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
656 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
657 } catch (TimeoutException ex) {
658 initialize.cancel(true);
659 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
660 deviceInfo.toString(),
661 String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
663 } catch (ExecutionException | InterruptedException ex) {
664 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
665 deviceInfo.toString(),
669 throw new RuntimeException(String.format("Unsupported version %s for device %s",
670 deviceInfo.getVersion(),
671 deviceInfo.toString()));
674 final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
675 getDeviceFlowRegistry().fill();
676 Futures.addCallback(deviceFlowRegistryFill,
677 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
681 void lazyTransactionManagerInitialization() {
682 if (!this.initialized.get()) {
683 if (LOG.isDebugEnabled()) {
684 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
686 this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
687 this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
688 deviceInfo.getNodeInstanceIdentifier());
689 this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
690 this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
693 transactionChainManager.activateTransactionManager();
694 initialized.set(true);
699 public <T> RequestContext<T> createRequestContext() {
700 final Long xid = deviceInfo.reserveXidForDeviceMessage();
702 final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
704 public void close() {
705 requestContexts.remove(this);
709 requestContexts.add(abstractRequestContext);
710 return abstractRequestContext;
714 public void onStateAcquired(final ContextChainState state) {
718 private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
719 .Optional<FlowCapableNode>>> {
720 private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
721 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
723 DeviceFlowRegistryCallback(
724 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
725 ContextChainMastershipWatcher contextChainMastershipWatcher) {
726 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
727 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
731 public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
732 if (LOG.isDebugEnabled()) {
733 // Count all flows we read from datastore for debugging purposes.
734 // This number do not always represent how many flows were actually added
735 // to DeviceFlowRegistry, because of possible duplicates.
736 long flowCount = Optional.ofNullable(result)
737 .map(Collections::singleton)
738 .orElse(Collections.emptySet())
740 .flatMap(Collection::stream)
741 .filter(Objects::nonNull)
742 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
743 .filter(Objects::nonNull)
744 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
745 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
746 .filter(Objects::nonNull)
747 .filter(table -> Objects.nonNull(table.getFlow()))
748 .flatMap(table -> table.getFlow().stream())
749 .filter(Objects::nonNull)
752 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo
755 this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
756 .INITIAL_FLOW_REGISTRY_FILL);
760 public void onFailure(Throwable throwable) {
761 if (deviceFlowRegistryFill.isCancelled()) {
762 if (LOG.isDebugEnabled()) {
763 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
766 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo
769 contextChainMastershipWatcher.onNotAbleToStartMastership(
771 "Was not able to fill flow registry on device",