Merge "Remove deprecated EOS services"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
49 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
50 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
53 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
55 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
56 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
57 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
59 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
60 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
61 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
62 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
63 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
64 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
65 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
66 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
68 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
70 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
71 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
72 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
73 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
99 import org.opendaylight.yangtools.yang.binding.DataContainer;
100 import org.opendaylight.yangtools.yang.binding.DataObject;
101 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
102 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
105
106 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
107
108     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
109
110     // TODO: drain factor should be parametrized
111     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
112     // TODO: low water mark factor should be parametrized
113     private static final float LOW_WATERMARK_FACTOR = 0.75f;
114     // TODO: high water mark factor should be parametrized
115     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
116
117     // Timeout in milliseconds after what we will give up on initializing device
118     private static final int DEVICE_INIT_TIMEOUT = 9000;
119
120     // Timeout in milliseconds after what we will give up on closing transaction chain
121     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
122
123     private static final int LOW_WATERMARK = 1000;
124     private static final int HIGH_WATERMARK = 2000;
125
126     private final MultipartWriterProvider writerProvider;
127     private final HashedWheelTimer hashedWheelTimer;
128     private final DeviceState deviceState;
129     private final DataBroker dataBroker;
130     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
131     private final MessageSpy messageSpy;
132     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
133     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
134     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
135             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
136     private final TranslatorLibrary translatorLibrary;
137     private final ConvertorExecutor convertorExecutor;
138     private final DeviceInitializerProvider deviceInitializerProvider;
139     private final PacketInRateLimiter packetInLimiter;
140     private final DeviceInfo deviceInfo;
141     private final ConnectionContext primaryConnectionContext;
142     private final boolean skipTableFeatures;
143     private final boolean switchFeaturesMandatory;
144     private final boolean isFlowRemovedNotificationOn;
145     private final boolean useSingleLayerSerialization;
146     private final AtomicBoolean initialized = new AtomicBoolean(false);
147     private final AtomicBoolean hasState = new AtomicBoolean(false);
148     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
149     private NotificationPublishService notificationPublishService;
150     private TransactionChainManager transactionChainManager;
151     private DeviceFlowRegistry deviceFlowRegistry;
152     private DeviceGroupRegistry deviceGroupRegistry;
153     private DeviceMeterRegistry deviceMeterRegistry;
154     private ExtensionConverterProvider extensionConverterProvider;
155     private ContextChainMastershipWatcher contextChainMastershipWatcher;
156
157     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
158                       @Nonnull final DataBroker dataBroker,
159                       @Nonnull final MessageSpy messageSpy,
160                       @Nonnull final TranslatorLibrary translatorLibrary,
161                       final ConvertorExecutor convertorExecutor,
162                       final boolean skipTableFeatures,
163                       final HashedWheelTimer hashedWheelTimer,
164                       final boolean useSingleLayerSerialization,
165                       final DeviceInitializerProvider deviceInitializerProvider,
166                       final boolean isFlowRemovedNotificationOn,
167                       final boolean switchFeaturesMandatory) {
168
169         this.primaryConnectionContext = primaryConnectionContext;
170         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
171         this.hashedWheelTimer = hashedWheelTimer;
172         this.deviceInitializerProvider = deviceInitializerProvider;
173         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
174         this.switchFeaturesMandatory = switchFeaturesMandatory;
175         this.deviceState = new DeviceStateImpl();
176         this.dataBroker = dataBroker;
177         this.messageSpy = messageSpy;
178
179         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
180                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
181
182         this.translatorLibrary = translatorLibrary;
183         this.portStatusTranslator = translatorLibrary.lookupTranslator(
184                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
185         this.packetInTranslator = translatorLibrary.lookupTranslator(
186                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
187                         .protocol.rev130731
188                         .PacketIn.class.getName()));
189         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
190                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
191
192         this.convertorExecutor = convertorExecutor;
193         this.skipTableFeatures = skipTableFeatures;
194         this.useSingleLayerSerialization = useSingleLayerSerialization;
195         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
196     }
197
198     @Override
199     public boolean initialSubmitTransaction() {
200         if (!initialized.get()) {
201             return false;
202         }
203
204         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
205         isInitialTransactionSubmitted.set(initialSubmit);
206         return initialSubmit;
207     }
208
209     @Override
210     public DeviceState getDeviceState() {
211         return deviceState;
212     }
213
214     @Override
215     public ReadOnlyTransaction getReadTransaction() {
216         return dataBroker.newReadOnlyTransaction();
217     }
218
219     @Override
220     public boolean isTransactionsEnabled() {
221         return isInitialTransactionSubmitted.get();
222     }
223
224     @Override
225     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
226                                                           final InstanceIdentifier<T> path,
227                                                           final T data) {
228         if (initialized.get()) {
229             transactionChainManager.writeToTransaction(store, path, data, false);
230         }
231     }
232
233     @Override
234     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
235                                                                          final InstanceIdentifier<T> path,
236                                                                          final T data) {
237         if (initialized.get()) {
238             transactionChainManager.writeToTransaction(store, path, data, true);
239         }
240     }
241
242     @Override
243     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
244                                                           final InstanceIdentifier<T> path) {
245         if (initialized.get()) {
246             transactionChainManager.addDeleteOperationToTxChain(store, path);
247         }
248     }
249
250     @Override
251     public boolean submitTransaction() {
252         return initialized.get() && transactionChainManager.submitTransaction();
253     }
254
255     @Override
256     public ConnectionContext getPrimaryConnectionContext() {
257         return primaryConnectionContext;
258     }
259
260     @Override
261     public DeviceFlowRegistry getDeviceFlowRegistry() {
262         return deviceFlowRegistry;
263     }
264
265     @Override
266     public DeviceGroupRegistry getDeviceGroupRegistry() {
267         return deviceGroupRegistry;
268     }
269
270     @Override
271     public DeviceMeterRegistry getDeviceMeterRegistry() {
272         return deviceMeterRegistry;
273     }
274
275     @Override
276     public void processReply(final OfHeader ofHeader) {
277         messageSpy.spyMessage(
278                 ofHeader.getImplementedInterface(),
279                 ofHeader instanceof Error
280                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
281                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
282     }
283
284     @Override
285     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
286         ofHeaderList.forEach(header -> messageSpy.spyMessage(
287                 header.getImplementedInterface(),
288                 header instanceof Error
289                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
290                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
291     }
292
293     @Override
294     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
295         //1. translate to general flow (table, priority, match, cookie)
296         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
297                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
298
299         if (isFlowRemovedNotificationOn) {
300             // Trigger off a notification
301             notificationPublishService.offerNotification(flowRemovedNotification);
302         }
303     }
304
305     @Override
306     @SuppressWarnings("checkstyle:IllegalCatch")
307     public void processPortStatusMessage(final PortStatusMessage portStatus) {
308         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
309                 .FROM_SWITCH_PUBLISHED_SUCCESS);
310
311         if (initialized.get()) {
312             try {
313                 writePortStatusMessage(portStatus);
314                 submitTransaction();
315             } catch (final Exception e) {
316                 LOG.warn("Error processing port status message for port {} on device {}",
317                         portStatus.getPortNo(), getDeviceInfo(), e);
318             }
319         } else if (!hasState.get()) {
320             primaryConnectionContext.handlePortStatusMessage(portStatus);
321         }
322     }
323
324     private void writePortStatusMessage(final PortStatus portStatusMessage) {
325         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
326                 .translate(portStatusMessage, getDeviceInfo(), null);
327
328         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
329                 .getNodeInstanceIdentifier()
330                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
331                         .nodeConnectorIdfromDatapathPortNo(
332                                 deviceInfo.getDatapathId(),
333                                 portStatusMessage.getPortNo(),
334                                 OpenflowVersion.get(deviceInfo.getVersion()))));
335
336         writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
337                 .setKey(iiToNodeConnector.getKey())
338                 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
339                         FlowCapableNodeConnectorStatisticsDataBuilder().build())
340                 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
341                 .build());
342         submitTransaction();
343         if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
344             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
345             submitTransaction();
346         }
347     }
348
349     @Override
350     public void processPacketInMessage(final PacketInMessage packetInMessage) {
351         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
352         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
353     }
354
355     private void handlePacketInMessage(final PacketIn packetIn,
356                                        final Class<?> implementedInterface,
357                                        final Match match) {
358         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
359         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
360
361         if (packetIn == null) {
362             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
363             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
364             return;
365         }
366
367         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
368
369         // Try to get ingress from match
370         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
371                 ? packetIn.getIngress() : Optional.ofNullable(match)
372                 .map(Match::getInPort)
373                 .map(nodeConnectorId -> InventoryDataServiceUtil
374                         .portNumberfromNodeConnectorId(
375                                 openflowVersion,
376                                 nodeConnectorId))
377                 .map(portNumber -> InventoryDataServiceUtil
378                         .nodeConnectorRefFromDatapathIdPortno(
379                                 deviceInfo.getDatapathId(),
380                                 portNumber,
381                                 openflowVersion))
382                 .orElse(null);
383
384         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
385
386         if (!packetInLimiter.acquirePermit()) {
387             LOG.debug("Packet limited");
388             // TODO: save packet into emergency slot if possible
389             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
390                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
391             return;
392         }
393
394         final ListenableFuture<?> offerNotification = notificationPublishService
395                 .offerNotification(new PacketReceivedBuilder(packetIn)
396                         .setIngress(nodeConnectorRef)
397                         .setMatch(MatchUtil.transformMatch(match,
398                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
399                                         .Match.class))
400                         .build());
401
402         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
403             LOG.debug("notification offer rejected");
404             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
405             packetInLimiter.drainLowWaterMark();
406             packetInLimiter.releasePermit();
407             return;
408         }
409
410         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
411             @Override
412             public void onSuccess(final Object result) {
413                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
414                 packetInLimiter.releasePermit();
415             }
416
417             @Override
418             public void onFailure(final Throwable throwable) {
419                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
420                         .FROM_SWITCH_NOTIFICATION_REJECTED);
421                 LOG.debug("notification offer failed: {}", throwable.getMessage());
422                 LOG.trace("notification offer failed..", throwable);
423                 packetInLimiter.releasePermit();
424             }
425         }, MoreExecutors.directExecutor());
426     }
427
428     @Override
429     public void processExperimenterMessage(final ExperimenterMessage notification) {
430         // lookup converter
431         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
432         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
433                 getDeviceInfo().getVersion(),
434                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
435         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
436                 extensionConverterProvider.getMessageConverter(key);
437         if (messageConverter == null) {
438             LOG.warn("custom converter for {}[OF:{}] not found",
439                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
440                     getDeviceInfo().getVersion());
441             return;
442         }
443         // build notification
444         final ExperimenterMessageOfChoice messageOfChoice;
445         try {
446             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
447             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
448                     ExperimenterMessageFromDevBuilder()
449                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
450                     .setExperimenterMessageOfChoice(messageOfChoice);
451             // publish
452             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
453         } catch (final ConversionException e) {
454             LOG.error("Conversion of experimenter notification failed", e);
455         }
456     }
457
458     @Override
459     public boolean processAlienMessage(final OfHeader message) {
460         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
461
462         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
463                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
464             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
465                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
466                     .rev130709.PacketInMessage.class.cast(message);
467
468             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
469             return true;
470         }
471
472         return false;
473     }
474
475     @Override
476     public TranslatorLibrary oook() {
477         return translatorLibrary;
478     }
479
480     @Override
481     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
482         this.notificationPublishService = notificationPublishService;
483     }
484
485     @Override
486     public MessageSpy getMessageSpy() {
487         return messageSpy;
488     }
489
490     @Override
491     public void onPublished() {
492         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
493     }
494
495     @Override
496     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
497                                                                                       requestContext) {
498         return new MultiMsgCollectorImpl<>(this, requestContext);
499     }
500
501     @Override
502     public void updatePacketInRateLimit(final long upperBound) {
503         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
504                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
505     }
506
507     @Override
508     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
509         this.extensionConverterProvider = extensionConverterProvider;
510     }
511
512     @Override
513     public ExtensionConverterProvider getExtensionConverterProvider() {
514         return extensionConverterProvider;
515     }
516
517     @VisibleForTesting
518     TransactionChainManager getTransactionChainManager() {
519         return this.transactionChainManager;
520     }
521
522     @Override
523     public ListenableFuture<Void> closeServiceInstance() {
524         final ListenableFuture<Void> listenableFuture = initialized.get()
525                 ? transactionChainManager.deactivateTransactionManager()
526                 : Futures.immediateFuture(null);
527
528         hashedWheelTimer.newTimeout((timerTask) -> {
529             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
530                 listenableFuture.cancel(true);
531             }
532         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
533
534         return listenableFuture;
535     }
536
537     @Override
538     public DeviceInfo getDeviceInfo() {
539         return this.deviceInfo;
540     }
541
542     @Override
543     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
544         this.contextChainMastershipWatcher = newWatcher;
545     }
546
547     @Nonnull
548     @Override
549     public ServiceGroupIdentifier getIdentifier() {
550         return deviceInfo.getServiceIdentifier();
551     }
552
553     @Override
554     public void close() {
555         // Close all datastore registries and transactions
556         if (initialized.getAndSet(false)) {
557             deviceGroupRegistry.close();
558             deviceFlowRegistry.close();
559             deviceMeterRegistry.close();
560
561             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
562
563             Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
564                 @Override
565                 public void onSuccess(@Nullable final Void result) {
566                     transactionChainManager.close();
567                     transactionChainManager = null;
568                 }
569
570                 @Override
571                 public void onFailure(final Throwable throwable) {
572                     transactionChainManager.close();
573                     transactionChainManager = null;
574                 }
575             }, MoreExecutors.directExecutor());
576         }
577
578         requestContexts.forEach(requestContext -> RequestContextUtil
579                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
580         requestContexts.clear();
581     }
582
583     @Override
584     public boolean canUseSingleLayerSerialization() {
585         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
586     }
587
588     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
589     @Override
590     @SuppressWarnings({"checkstyle:IllegalCatch"})
591     public void instantiateServiceInstance() {
592         lazyTransactionManagerInitialization();
593
594         try {
595             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
596                     .retrieveAndClearPortStatusMessages();
597
598             portStatusMessages.forEach(this::writePortStatusMessage);
599             submitTransaction();
600         } catch (final Exception ex) {
601             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
602                     deviceInfo.toString(), ex.toString()), ex);
603         }
604
605         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
606                 .lookup(deviceInfo.getVersion());
607
608         if (initializer.isPresent()) {
609             final Future<Void> initialize = initializer
610                     .get()
611                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
612
613             try {
614                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
615             } catch (TimeoutException ex) {
616                 initialize.cancel(true);
617                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
618                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
619             } catch (ExecutionException | InterruptedException ex) {
620                 throw new RuntimeException(
621                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
622             }
623         } else {
624             throw new RuntimeException(String.format("Unsupported version %s for device %s",
625                     deviceInfo.getVersion(),
626                     deviceInfo.toString()));
627         }
628
629         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
630                 getDeviceFlowRegistry().fill();
631         Futures.addCallback(deviceFlowRegistryFill,
632                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
633                 MoreExecutors.directExecutor());
634     }
635
636     @VisibleForTesting
637     void lazyTransactionManagerInitialization() {
638         if (!this.initialized.get()) {
639             if (LOG.isDebugEnabled()) {
640                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
641             }
642             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
643             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
644                     deviceInfo.getNodeInstanceIdentifier());
645             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
646             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
647         }
648
649         transactionChainManager.activateTransactionManager();
650         initialized.set(true);
651     }
652
653     @Nullable
654     @Override
655     public <T> RequestContext<T> createRequestContext() {
656         final Long xid = deviceInfo.reserveXidForDeviceMessage();
657
658         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
659             @Override
660             public void close() {
661                 requestContexts.remove(this);
662             }
663         };
664
665         requestContexts.add(abstractRequestContext);
666         return abstractRequestContext;
667     }
668
669     @Override
670     public void onStateAcquired(final ContextChainState state) {
671         hasState.set(true);
672     }
673
674     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
675             .Optional<FlowCapableNode>>> {
676         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
677         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
678
679         DeviceFlowRegistryCallback(
680                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
681                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
682             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
683             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
684         }
685
686         @Override
687         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
688             if (LOG.isDebugEnabled()) {
689                 // Count all flows we read from datastore for debugging purposes.
690                 // This number do not always represent how many flows were actually added
691                 // to DeviceFlowRegistry, because of possible duplicates.
692                 long flowCount = Optional.ofNullable(result)
693                         .map(Collections::singleton)
694                         .orElse(Collections.emptySet())
695                         .stream()
696                         .flatMap(Collection::stream)
697                         .filter(Objects::nonNull)
698                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
699                         .filter(Objects::nonNull)
700                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
701                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
702                         .filter(Objects::nonNull)
703                         .filter(table -> Objects.nonNull(table.getFlow()))
704                         .flatMap(table -> table.getFlow().stream())
705                         .filter(Objects::nonNull)
706                         .count();
707
708                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
709             }
710             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
711                     .INITIAL_FLOW_REGISTRY_FILL);
712         }
713
714         @Override
715         public void onFailure(Throwable throwable) {
716             if (deviceFlowRegistryFill.isCancelled()) {
717                 if (LOG.isDebugEnabled()) {
718                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
719                 }
720             } else {
721                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
722                          throwable);
723             }
724             contextChainMastershipWatcher.onNotAbleToStartMastership(
725                     deviceInfo,
726                     "Was not able to fill flow registry on device",
727                     false);
728         }
729     }
730
731 }