Rename addDeleteOperationTotTxChain => addDeleteOperationToTxChain
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
41 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
48 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
49 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
50 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
53 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
54 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
55 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
57 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
58 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
59 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
60 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
61 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
62 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
63 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
64 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
65 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
66 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
68 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
69 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
70 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
71 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
72 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
98 import org.opendaylight.yangtools.yang.binding.DataContainer;
99 import org.opendaylight.yangtools.yang.binding.DataObject;
100 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
101 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
102 import org.slf4j.Logger;
103 import org.slf4j.LoggerFactory;
104
105 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
106
107     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
108
109     // TODO: drain factor should be parametrized
110     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
111     // TODO: low water mark factor should be parametrized
112     private static final float LOW_WATERMARK_FACTOR = 0.75f;
113     // TODO: high water mark factor should be parametrized
114     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
115
116     // Timeout in milliseconds after what we will give up on initializing device
117     private static final int DEVICE_INIT_TIMEOUT = 9000;
118
119     // Timeout in milliseconds after what we will give up on closing transaction chain
120     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
121
122     private static final int LOW_WATERMARK = 1000;
123     private static final int HIGH_WATERMARK = 2000;
124
125     private final MultipartWriterProvider writerProvider;
126     private final HashedWheelTimer hashedWheelTimer;
127     private final DeviceState deviceState;
128     private final DataBroker dataBroker;
129     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
130     private final MessageSpy messageSpy;
131     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
132     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
133     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
134             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
135     private final TranslatorLibrary translatorLibrary;
136     private final ConvertorExecutor convertorExecutor;
137     private final DeviceInitializerProvider deviceInitializerProvider;
138     private final PacketInRateLimiter packetInLimiter;
139     private final DeviceInfo deviceInfo;
140     private final ConnectionContext primaryConnectionContext;
141     private final boolean skipTableFeatures;
142     private final boolean switchFeaturesMandatory;
143     private final boolean isFlowRemovedNotificationOn;
144     private final boolean useSingleLayerSerialization;
145     private final AtomicBoolean initialized = new AtomicBoolean(false);
146     private final AtomicBoolean hasState = new AtomicBoolean(false);
147     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
148     private NotificationPublishService notificationPublishService;
149     private TransactionChainManager transactionChainManager;
150     private DeviceFlowRegistry deviceFlowRegistry;
151     private DeviceGroupRegistry deviceGroupRegistry;
152     private DeviceMeterRegistry deviceMeterRegistry;
153     private ExtensionConverterProvider extensionConverterProvider;
154     private ContextChainMastershipWatcher contextChainMastershipWatcher;
155
156     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
157                       @Nonnull final DataBroker dataBroker,
158                       @Nonnull final MessageSpy messageSpy,
159                       @Nonnull final TranslatorLibrary translatorLibrary,
160                       final ConvertorExecutor convertorExecutor,
161                       final boolean skipTableFeatures,
162                       final HashedWheelTimer hashedWheelTimer,
163                       final boolean useSingleLayerSerialization,
164                       final DeviceInitializerProvider deviceInitializerProvider,
165                       final boolean isFlowRemovedNotificationOn,
166                       final boolean switchFeaturesMandatory) {
167
168         this.primaryConnectionContext = primaryConnectionContext;
169         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
170         this.hashedWheelTimer = hashedWheelTimer;
171         this.deviceInitializerProvider = deviceInitializerProvider;
172         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
173         this.switchFeaturesMandatory = switchFeaturesMandatory;
174         this.deviceState = new DeviceStateImpl();
175         this.dataBroker = dataBroker;
176         this.messageSpy = messageSpy;
177
178         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
179                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
180
181         this.translatorLibrary = translatorLibrary;
182         this.portStatusTranslator = translatorLibrary.lookupTranslator(
183                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
184         this.packetInTranslator = translatorLibrary.lookupTranslator(
185                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
186                         .protocol.rev130731
187                         .PacketIn.class.getName()));
188         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
189                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
190
191         this.convertorExecutor = convertorExecutor;
192         this.skipTableFeatures = skipTableFeatures;
193         this.useSingleLayerSerialization = useSingleLayerSerialization;
194         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
195     }
196
197     @Override
198     public boolean initialSubmitTransaction() {
199         if (!initialized.get()) {
200             return false;
201         }
202
203         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
204         isInitialTransactionSubmitted.set(initialSubmit);
205         return initialSubmit;
206     }
207
208     @Override
209     public DeviceState getDeviceState() {
210         return deviceState;
211     }
212
213     @Override
214     public ReadOnlyTransaction getReadTransaction() {
215         return dataBroker.newReadOnlyTransaction();
216     }
217
218     @Override
219     public boolean isTransactionsEnabled() {
220         return isInitialTransactionSubmitted.get();
221     }
222
223     @Override
224     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
225                                                           final InstanceIdentifier<T> path,
226                                                           final T data) {
227         if (initialized.get()) {
228             transactionChainManager.writeToTransaction(store, path, data, false);
229         }
230     }
231
232     @Override
233     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
234                                                                          final InstanceIdentifier<T> path,
235                                                                          final T data) {
236         if (initialized.get()) {
237             transactionChainManager.writeToTransaction(store, path, data, true);
238         }
239     }
240
241     @Override
242     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
243                                                           final InstanceIdentifier<T> path) {
244         if (initialized.get()) {
245             transactionChainManager.addDeleteOperationToTxChain(store, path);
246         }
247     }
248
249     @Override
250     public boolean submitTransaction() {
251         return initialized.get() && transactionChainManager.submitTransaction();
252     }
253
254     @Override
255     public ConnectionContext getPrimaryConnectionContext() {
256         return primaryConnectionContext;
257     }
258
259     @Override
260     public DeviceFlowRegistry getDeviceFlowRegistry() {
261         return deviceFlowRegistry;
262     }
263
264     @Override
265     public DeviceGroupRegistry getDeviceGroupRegistry() {
266         return deviceGroupRegistry;
267     }
268
269     @Override
270     public DeviceMeterRegistry getDeviceMeterRegistry() {
271         return deviceMeterRegistry;
272     }
273
274     @Override
275     public void processReply(final OfHeader ofHeader) {
276         messageSpy.spyMessage(
277                 ofHeader.getImplementedInterface(),
278                 (ofHeader instanceof Error)
279                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
280                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
281     }
282
283     @Override
284     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
285         ofHeaderList.forEach(header -> messageSpy.spyMessage(
286                 header.getImplementedInterface(),
287                 (header instanceof Error)
288                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
289                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
290     }
291
292     @Override
293     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
294         //1. translate to general flow (table, priority, match, cookie)
295         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
296                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
297
298         if (isFlowRemovedNotificationOn) {
299             // Trigger off a notification
300             notificationPublishService.offerNotification(flowRemovedNotification);
301         }
302     }
303
304     @Override
305     @SuppressWarnings("checkstyle:IllegalCatch")
306     public void processPortStatusMessage(final PortStatusMessage portStatus) {
307         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
308                 .FROM_SWITCH_PUBLISHED_SUCCESS);
309
310         if (initialized.get()) {
311             try {
312                 writePortStatusMessage(portStatus);
313                 submitTransaction();
314             } catch (final Exception e) {
315                 LOG.warn("Error processing port status message for port {} on device {}",
316                         portStatus.getPortNo(), getDeviceInfo(), e);
317             }
318         } else if (!hasState.get()) {
319             primaryConnectionContext.handlePortStatusMessage(portStatus);
320         }
321     }
322
323     private void writePortStatusMessage(final PortStatus portStatusMessage) {
324         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
325                 .translate(portStatusMessage, getDeviceInfo(), null);
326
327         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
328                 .getNodeInstanceIdentifier()
329                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
330                         .nodeConnectorIdfromDatapathPortNo(
331                                 deviceInfo.getDatapathId(),
332                                 portStatusMessage.getPortNo(),
333                                 OpenflowVersion.get(deviceInfo.getVersion()))));
334
335         if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
336                 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
337             // because of ADD status node connector has to be created
338             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
339                     .setKey(iiToNodeConnector.getKey())
340                     .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
341                             FlowCapableNodeConnectorStatisticsDataBuilder().build())
342                     .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
343                     .build());
344         } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
345             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
346         }
347     }
348
349     @Override
350     public void processPacketInMessage(final PacketInMessage packetInMessage) {
351         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
352         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
353     }
354
355     private void handlePacketInMessage(final PacketIn packetIn,
356                                        final Class<?> implementedInterface,
357                                        final Match match) {
358         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
359         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
360
361         if (packetIn == null) {
362             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
363             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
364             return;
365         }
366
367         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
368
369         // Try to get ingress from match
370         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
371                 ? packetIn.getIngress() : Optional.ofNullable(match)
372                 .map(Match::getInPort)
373                 .map(nodeConnectorId -> InventoryDataServiceUtil
374                         .portNumberfromNodeConnectorId(
375                                 openflowVersion,
376                                 nodeConnectorId))
377                 .map(portNumber -> InventoryDataServiceUtil
378                         .nodeConnectorRefFromDatapathIdPortno(
379                                 deviceInfo.getDatapathId(),
380                                 portNumber,
381                                 openflowVersion))
382                 .orElse(null);
383
384         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
385
386         if (!packetInLimiter.acquirePermit()) {
387             LOG.debug("Packet limited");
388             // TODO: save packet into emergency slot if possible
389             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
390                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
391             return;
392         }
393
394         final ListenableFuture<?> offerNotification = notificationPublishService
395                 .offerNotification(new PacketReceivedBuilder(packetIn)
396                         .setIngress(nodeConnectorRef)
397                         .setMatch(MatchUtil.transformMatch(match,
398                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
399                                         .Match.class))
400                         .build());
401
402         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
403             LOG.debug("notification offer rejected");
404             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
405             packetInLimiter.drainLowWaterMark();
406             packetInLimiter.releasePermit();
407             return;
408         }
409
410         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
411             @Override
412             public void onSuccess(final Object result) {
413                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
414                 packetInLimiter.releasePermit();
415             }
416
417             @Override
418             public void onFailure(final Throwable throwable) {
419                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
420                         .FROM_SWITCH_NOTIFICATION_REJECTED);
421                 LOG.debug("notification offer failed: {}", throwable.getMessage());
422                 LOG.trace("notification offer failed..", throwable);
423                 packetInLimiter.releasePermit();
424             }
425         });
426     }
427
428     @Override
429     public void processExperimenterMessage(final ExperimenterMessage notification) {
430         // lookup converter
431         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
432         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
433                 getDeviceInfo().getVersion(),
434                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
435         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
436                 extensionConverterProvider.getMessageConverter(key);
437         if (messageConverter == null) {
438             LOG.warn("custom converter for {}[OF:{}] not found",
439                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
440                     getDeviceInfo().getVersion());
441             return;
442         }
443         // build notification
444         final ExperimenterMessageOfChoice messageOfChoice;
445         try {
446             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
447             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
448                     ExperimenterMessageFromDevBuilder()
449                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
450                     .setExperimenterMessageOfChoice(messageOfChoice);
451             // publish
452             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
453         } catch (final ConversionException e) {
454             LOG.error("Conversion of experimenter notification failed", e);
455         }
456     }
457
458     @Override
459     public boolean processAlienMessage(final OfHeader message) {
460         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
461
462         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
463                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
464             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
465                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
466                     .rev130709.PacketInMessage.class.cast(message);
467
468             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
469             return true;
470         }
471
472         return false;
473     }
474
475     @Override
476     public TranslatorLibrary oook() {
477         return translatorLibrary;
478     }
479
480     @Override
481     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
482         this.notificationPublishService = notificationPublishService;
483     }
484
485     @Override
486     public MessageSpy getMessageSpy() {
487         return messageSpy;
488     }
489
490     @Override
491     public void onPublished() {
492         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
493     }
494
495     @Override
496     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
497                                                                                       requestContext) {
498         return new MultiMsgCollectorImpl<>(this, requestContext);
499     }
500
501     @Override
502     public void updatePacketInRateLimit(final long upperBound) {
503         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
504                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
505     }
506
507     @Override
508     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
509         this.extensionConverterProvider = extensionConverterProvider;
510     }
511
512     @Override
513     public ExtensionConverterProvider getExtensionConverterProvider() {
514         return extensionConverterProvider;
515     }
516
517     @VisibleForTesting
518     TransactionChainManager getTransactionChainManager() {
519         return this.transactionChainManager;
520     }
521
522     @Override
523     public ListenableFuture<Void> closeServiceInstance() {
524         final ListenableFuture<Void> listenableFuture = initialized.get()
525                 ? transactionChainManager.deactivateTransactionManager()
526                 : Futures.immediateFuture(null);
527
528         hashedWheelTimer.newTimeout((timerTask) -> {
529             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
530                 listenableFuture.cancel(true);
531             }
532         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
533
534         return listenableFuture;
535     }
536
537     @Override
538     public DeviceInfo getDeviceInfo() {
539         return this.deviceInfo;
540     }
541
542     @Override
543     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
544         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
545     }
546
547     @Nonnull
548     @Override
549     public ServiceGroupIdentifier getIdentifier() {
550         return deviceInfo.getServiceIdentifier();
551     }
552
553     @Override
554     public void close() {
555         // Close all datastore registries and transactions
556         if (initialized.getAndSet(false)) {
557             deviceGroupRegistry.close();
558             deviceFlowRegistry.close();
559             deviceMeterRegistry.close();
560
561             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
562
563             Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
564                 @Override
565                 public void onSuccess(@Nullable final Void result) {
566                     transactionChainManager.close();
567                     transactionChainManager = null;
568                 }
569
570                 @Override
571                 public void onFailure(final Throwable throwable) {
572                     transactionChainManager.close();
573                     transactionChainManager = null;
574                 }
575             });
576         }
577
578         requestContexts.forEach(requestContext -> RequestContextUtil
579                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
580         requestContexts.clear();
581     }
582
583     @Override
584     public boolean canUseSingleLayerSerialization() {
585         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
586     }
587
588     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
589     @Override
590     @SuppressWarnings({"checkstyle:IllegalCatch", "checkstyle:AvoidHidingCauseExceptionCheck"})
591     public void instantiateServiceInstance() {
592         lazyTransactionManagerInitialization();
593
594         try {
595             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
596                     .retrieveAndClearPortStatusMessages();
597
598             portStatusMessages.forEach(this::writePortStatusMessage);
599             submitTransaction();
600         } catch (final Exception ex) {
601             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
602                     deviceInfo.toString(),
603                     ex.toString()));
604         }
605
606         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
607                 .lookup(deviceInfo.getVersion());
608
609         if (initializer.isPresent()) {
610             final Future<Void> initialize = initializer
611                     .get()
612                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
613
614             try {
615                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
616             } catch (TimeoutException ex) {
617                 initialize.cancel(true);
618                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
619                         deviceInfo.toString(),
620                         String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
621                         ex.toString()));
622             } catch (ExecutionException | InterruptedException ex) {
623                 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
624                         deviceInfo.toString(),
625                         ex.toString()));
626             }
627         } else {
628             throw new RuntimeException(String.format("Unsupported version %s for device %s",
629                     deviceInfo.getVersion(),
630                     deviceInfo.toString()));
631         }
632
633         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
634                 getDeviceFlowRegistry().fill();
635         Futures.addCallback(deviceFlowRegistryFill,
636                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
637     }
638
639     @VisibleForTesting
640     void lazyTransactionManagerInitialization() {
641         if (!this.initialized.get()) {
642             if (LOG.isDebugEnabled()) {
643                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
644             }
645             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
646             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
647                     deviceInfo.getNodeInstanceIdentifier());
648             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
649             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
650         }
651
652         transactionChainManager.activateTransactionManager();
653         initialized.set(true);
654     }
655
656     @Nullable
657     @Override
658     public <T> RequestContext<T> createRequestContext() {
659         final Long xid = deviceInfo.reserveXidForDeviceMessage();
660
661         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
662             @Override
663             public void close() {
664                 requestContexts.remove(this);
665             }
666         };
667
668         requestContexts.add(abstractRequestContext);
669         return abstractRequestContext;
670     }
671
672     @Override
673     public void onStateAcquired(final ContextChainState state) {
674         hasState.set(true);
675     }
676
677     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
678             .Optional<FlowCapableNode>>> {
679         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
680         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
681
682         DeviceFlowRegistryCallback(
683                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
684                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
685             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
686             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
687         }
688
689         @Override
690         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
691             if (LOG.isDebugEnabled()) {
692                 // Count all flows we read from datastore for debugging purposes.
693                 // This number do not always represent how many flows were actually added
694                 // to DeviceFlowRegistry, because of possible duplicates.
695                 long flowCount = Optional.ofNullable(result)
696                         .map(Collections::singleton)
697                         .orElse(Collections.emptySet())
698                         .stream()
699                         .flatMap(Collection::stream)
700                         .filter(Objects::nonNull)
701                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
702                         .filter(Objects::nonNull)
703                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
704                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
705                         .filter(Objects::nonNull)
706                         .filter(table -> Objects.nonNull(table.getFlow()))
707                         .flatMap(table -> table.getFlow().stream())
708                         .filter(Objects::nonNull)
709                         .count();
710
711                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
712             }
713             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
714                     .INITIAL_FLOW_REGISTRY_FILL);
715         }
716
717         @Override
718         public void onFailure(Throwable throwable) {
719             if (deviceFlowRegistryFill.isCancelled()) {
720                 if (LOG.isDebugEnabled()) {
721                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
722                 }
723             } else {
724                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
725                          throwable);
726             }
727             contextChainMastershipWatcher.onNotAbleToStartMastership(
728                     deviceInfo,
729                     "Was not able to fill flow registry on device",
730                     false);
731         }
732     }
733
734 }