b3c8a3445cd038cbbf8616ae6d79d021ca716f64
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
31 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
32 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
43 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
63 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
65 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
66 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
69 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
72 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
73 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
74 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
76 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
110
111     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112
113     // TODO: drain factor should be parametrized
114     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
115     // TODO: low water mark factor should be parametrized
116     private static final float LOW_WATERMARK_FACTOR = 0.75f;
117     // TODO: high water mark factor should be parametrized
118     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
119
120     // Timeout in milliseconds after what we will give up on initializing device
121     private static final int DEVICE_INIT_TIMEOUT = 9000;
122
123     // Timeout in milliseconds after what we will give up on closing transaction chain
124     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
125
126     private static final int LOW_WATERMARK = 1000;
127     private static final int HIGH_WATERMARK = 2000;
128
129     private final MultipartWriterProvider writerProvider;
130     private final HashedWheelTimer hashedWheelTimer;
131     private final DeviceState deviceState;
132     private final DataBroker dataBroker;
133     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
134     private final MessageSpy messageSpy;
135     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
136     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
137     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
138             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
139     private final TranslatorLibrary translatorLibrary;
140     private final ConvertorExecutor convertorExecutor;
141     private final DeviceInitializerProvider deviceInitializerProvider;
142     private final PacketInRateLimiter packetInLimiter;
143     private final DeviceInfo deviceInfo;
144     private final ConnectionContext primaryConnectionContext;
145     private final boolean skipTableFeatures;
146     private final boolean switchFeaturesMandatory;
147     private final boolean isFlowRemovedNotificationOn;
148     private final boolean useSingleLayerSerialization;
149     private final AtomicBoolean initialized = new AtomicBoolean(false);
150     private final AtomicBoolean hasState = new AtomicBoolean(false);
151     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
152     private final ContextChainHolder contextChainHolder;
153     private NotificationPublishService notificationPublishService;
154     private TransactionChainManager transactionChainManager;
155     private DeviceFlowRegistry deviceFlowRegistry;
156     private DeviceGroupRegistry deviceGroupRegistry;
157     private DeviceMeterRegistry deviceMeterRegistry;
158     private ExtensionConverterProvider extensionConverterProvider;
159     private ContextChainMastershipWatcher contextChainMastershipWatcher;
160
161     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
162                       @Nonnull final DataBroker dataBroker,
163                       @Nonnull final MessageSpy messageSpy,
164                       @Nonnull final TranslatorLibrary translatorLibrary,
165                       final ConvertorExecutor convertorExecutor,
166                       final boolean skipTableFeatures,
167                       final HashedWheelTimer hashedWheelTimer,
168                       final boolean useSingleLayerSerialization,
169                       final DeviceInitializerProvider deviceInitializerProvider,
170                       final boolean isFlowRemovedNotificationOn,
171                       final boolean switchFeaturesMandatory,
172                       final ContextChainHolder contextChainHolder) {
173
174         this.primaryConnectionContext = primaryConnectionContext;
175         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
176         this.hashedWheelTimer = hashedWheelTimer;
177         this.deviceInitializerProvider = deviceInitializerProvider;
178         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
179         this.switchFeaturesMandatory = switchFeaturesMandatory;
180         this.deviceState = new DeviceStateImpl();
181         this.dataBroker = dataBroker;
182         this.messageSpy = messageSpy;
183         this.contextChainHolder = contextChainHolder;
184
185         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
186                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
187
188         this.translatorLibrary = translatorLibrary;
189         this.portStatusTranslator = translatorLibrary.lookupTranslator(
190                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
191         this.packetInTranslator = translatorLibrary.lookupTranslator(
192                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
193                         .protocol.rev130731
194                         .PacketIn.class.getName()));
195         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
196                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
197
198         this.convertorExecutor = convertorExecutor;
199         this.skipTableFeatures = skipTableFeatures;
200         this.useSingleLayerSerialization = useSingleLayerSerialization;
201         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
202     }
203
204     @Override
205     public boolean initialSubmitTransaction() {
206         if (!initialized.get()) {
207             return false;
208         }
209
210         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
211         isInitialTransactionSubmitted.set(initialSubmit);
212         return initialSubmit;
213     }
214
215     @Override
216     public DeviceState getDeviceState() {
217         return deviceState;
218     }
219
220     @Override
221     public ReadOnlyTransaction getReadTransaction() {
222         return dataBroker.newReadOnlyTransaction();
223     }
224
225     @Override
226     public boolean isTransactionsEnabled() {
227         return isInitialTransactionSubmitted.get();
228     }
229
230     @Override
231     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
232                                                           final InstanceIdentifier<T> path,
233                                                           final T data) {
234         if (initialized.get()) {
235             transactionChainManager.writeToTransaction(store, path, data, false);
236         }
237     }
238
239     @Override
240     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
241                                                                          final InstanceIdentifier<T> path,
242                                                                          final T data) {
243         if (initialized.get()) {
244             transactionChainManager.writeToTransaction(store, path, data, true);
245         }
246     }
247
248     @Override
249     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
250                                                           final InstanceIdentifier<T> path) {
251         if (initialized.get()) {
252             transactionChainManager.addDeleteOperationToTxChain(store, path);
253         }
254     }
255
256     @Override
257     public boolean submitTransaction() {
258         return initialized.get() && transactionChainManager.submitTransaction();
259     }
260
261     @Override
262     public boolean syncSubmitTransaction() {
263         return initialized.get() && transactionChainManager.submitTransaction(true);
264     }
265
266     @Override
267     public ConnectionContext getPrimaryConnectionContext() {
268         return primaryConnectionContext;
269     }
270
271     @Override
272     public DeviceFlowRegistry getDeviceFlowRegistry() {
273         return deviceFlowRegistry;
274     }
275
276     @Override
277     public DeviceGroupRegistry getDeviceGroupRegistry() {
278         return deviceGroupRegistry;
279     }
280
281     @Override
282     public DeviceMeterRegistry getDeviceMeterRegistry() {
283         return deviceMeterRegistry;
284     }
285
286     @Override
287     public void processReply(final OfHeader ofHeader) {
288         messageSpy.spyMessage(
289                 ofHeader.getImplementedInterface(),
290                 ofHeader instanceof Error
291                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
292                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
293     }
294
295     @Override
296     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
297         ofHeaderList.forEach(header -> messageSpy.spyMessage(
298                 header.getImplementedInterface(),
299                 header instanceof Error
300                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
301                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
302     }
303
304     @Override
305     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
306         if (isMasterOfDevice()) {
307             //1. translate to general flow (table, priority, match, cookie)
308             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
309                     .FlowRemoved flowRemovedNotification = flowRemovedTranslator
310                     .translate(flowRemoved, deviceInfo, null);
311
312             if (isFlowRemovedNotificationOn) {
313                 // Trigger off a notification
314                 notificationPublishService.offerNotification(flowRemovedNotification);
315             }
316         } else {
317             LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
318                     deviceInfo.getLOGValue());
319         }
320     }
321
322     @Override
323     @SuppressWarnings("checkstyle:IllegalCatch")
324     public void processPortStatusMessage(final PortStatusMessage portStatus) {
325         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
326                 .FROM_SWITCH_PUBLISHED_SUCCESS);
327
328         if (initialized.get()) {
329             try {
330                 writePortStatusMessage(portStatus);
331             } catch (final Exception e) {
332                 LOG.warn("Error processing port status message for port {} on device {}",
333                         portStatus.getPortNo(), getDeviceInfo(), e);
334             }
335         } else if (!hasState.get()) {
336             primaryConnectionContext.handlePortStatusMessage(portStatus);
337         }
338     }
339
340     private void writePortStatusMessage(final PortStatus portStatusMessage) {
341         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
342                 .translate(portStatusMessage, getDeviceInfo(), null);
343
344         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
345                 .getNodeInstanceIdentifier()
346                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
347                         .nodeConnectorIdfromDatapathPortNo(
348                                 deviceInfo.getDatapathId(),
349                                 portStatusMessage.getPortNo(),
350                                 OpenflowVersion.get(deviceInfo.getVersion()))));
351
352         writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
353                 .setKey(iiToNodeConnector.getKey())
354                 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
355                         FlowCapableNodeConnectorStatisticsDataBuilder().build())
356                 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
357                 .build());
358         syncSubmitTransaction();
359         if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
360             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
361             syncSubmitTransaction();
362         }
363     }
364
365     @Override
366     public void processPacketInMessage(final PacketInMessage packetInMessage) {
367         if (isMasterOfDevice()) {
368             final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
369             handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
370         } else {
371             LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
372         }
373     }
374
375     private Boolean isMasterOfDevice() {
376         final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
377         boolean result = false;
378         if (contextChain != null) {
379             result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
380         }
381         return result;
382     }
383
384     private void handlePacketInMessage(final PacketIn packetIn,
385                                        final Class<?> implementedInterface,
386                                        final Match match) {
387         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
388         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
389
390         if (packetIn == null) {
391             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
392             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
393             return;
394         }
395
396         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
397
398         // Try to get ingress from match
399         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
400                 ? packetIn.getIngress() : Optional.ofNullable(match)
401                 .map(Match::getInPort)
402                 .map(nodeConnectorId -> InventoryDataServiceUtil
403                         .portNumberfromNodeConnectorId(
404                                 openflowVersion,
405                                 nodeConnectorId))
406                 .map(portNumber -> InventoryDataServiceUtil
407                         .nodeConnectorRefFromDatapathIdPortno(
408                                 deviceInfo.getDatapathId(),
409                                 portNumber,
410                                 openflowVersion))
411                 .orElse(null);
412
413         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
414
415         if (!packetInLimiter.acquirePermit()) {
416             LOG.debug("Packet limited");
417             // TODO: save packet into emergency slot if possible
418             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
419                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
420             return;
421         }
422
423         final ListenableFuture<?> offerNotification = notificationPublishService
424                 .offerNotification(new PacketReceivedBuilder(packetIn)
425                         .setIngress(nodeConnectorRef)
426                         .setMatch(MatchUtil.transformMatch(match,
427                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
428                                         .Match.class))
429                         .build());
430
431         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
432             LOG.debug("notification offer rejected");
433             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
434             packetInLimiter.drainLowWaterMark();
435             packetInLimiter.releasePermit();
436             return;
437         }
438
439         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
440             @Override
441             public void onSuccess(final Object result) {
442                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
443                 packetInLimiter.releasePermit();
444             }
445
446             @Override
447             public void onFailure(final Throwable throwable) {
448                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
449                         .FROM_SWITCH_NOTIFICATION_REJECTED);
450                 LOG.debug("notification offer failed: {}", throwable.getMessage());
451                 LOG.trace("notification offer failed..", throwable);
452                 packetInLimiter.releasePermit();
453             }
454         }, MoreExecutors.directExecutor());
455     }
456
457     @Override
458     public void processExperimenterMessage(final ExperimenterMessage notification) {
459         if (isMasterOfDevice()) {
460             // lookup converter
461             final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
462             final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
463                     getDeviceInfo().getVersion(),
464                     (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
465             final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
466                     extensionConverterProvider.getMessageConverter(key);
467             if (messageConverter == null) {
468                 LOG.warn("custom converter for {}[OF:{}] not found",
469                         notification.getExperimenterDataOfChoice().getImplementedInterface(),
470                         getDeviceInfo().getVersion());
471                 return;
472             }
473             // build notification
474             final ExperimenterMessageOfChoice messageOfChoice;
475             try {
476                 messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
477                 final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
478                         ExperimenterMessageFromDevBuilder()
479                         .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
480                         .setExperimenterMessageOfChoice(messageOfChoice);
481                 // publish
482                 notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
483             } catch (final ConversionException e) {
484                 LOG.error("Conversion of experimenter notification failed", e);
485             }
486         } else {
487             LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
488                     deviceInfo.getLOGValue());
489         }
490     }
491
492     @Override
493     // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
494     // recognize it.
495     @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
496     public boolean processAlienMessage(final OfHeader message) {
497         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
498
499         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
500                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
501             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
502                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503                     .rev130709.PacketInMessage.class.cast(message);
504
505             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
506             return true;
507         }
508
509         return false;
510     }
511
512     @Override
513     public TranslatorLibrary oook() {
514         return translatorLibrary;
515     }
516
517     @Override
518     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
519         this.notificationPublishService = notificationPublishService;
520     }
521
522     @Override
523     public MessageSpy getMessageSpy() {
524         return messageSpy;
525     }
526
527     @Override
528     public void onPublished() {
529         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
530     }
531
532     @Override
533     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
534                                                                                       requestContext) {
535         return new MultiMsgCollectorImpl<>(this, requestContext);
536     }
537
538     @Override
539     public void updatePacketInRateLimit(final long upperBound) {
540         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
541                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
542     }
543
544     @Override
545     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
546         this.extensionConverterProvider = extensionConverterProvider;
547     }
548
549     @Override
550     public ExtensionConverterProvider getExtensionConverterProvider() {
551         return extensionConverterProvider;
552     }
553
554     @VisibleForTesting
555     TransactionChainManager getTransactionChainManager() {
556         return this.transactionChainManager;
557     }
558
559     @Override
560     public ListenableFuture<Void> closeServiceInstance() {
561         final ListenableFuture<Void> listenableFuture = initialized.get()
562                 ? transactionChainManager.deactivateTransactionManager()
563                 : Futures.immediateFuture(null);
564
565         hashedWheelTimer.newTimeout((timerTask) -> {
566             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
567                 listenableFuture.cancel(true);
568             }
569         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
570
571         return listenableFuture;
572     }
573
574     @Override
575     public DeviceInfo getDeviceInfo() {
576         return this.deviceInfo;
577     }
578
579     @Override
580     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
581         this.contextChainMastershipWatcher = newWatcher;
582     }
583
584     @Nonnull
585     @Override
586     public ServiceGroupIdentifier getIdentifier() {
587         return deviceInfo.getServiceIdentifier();
588     }
589
590     @Override
591     public void close() {
592         // Close all datastore registries and transactions
593         if (initialized.getAndSet(false)) {
594             deviceGroupRegistry.close();
595             deviceFlowRegistry.close();
596             deviceMeterRegistry.close();
597
598             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
599
600             Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
601                 @Override
602                 public void onSuccess(@Nullable final Void result) {
603                     transactionChainManager.close();
604                     transactionChainManager = null;
605                 }
606
607                 @Override
608                 public void onFailure(final Throwable throwable) {
609                     transactionChainManager.close();
610                     transactionChainManager = null;
611                 }
612             }, MoreExecutors.directExecutor());
613         }
614
615         requestContexts.forEach(requestContext -> RequestContextUtil
616                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
617         requestContexts.clear();
618     }
619
620     @Override
621     public boolean canUseSingleLayerSerialization() {
622         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
623     }
624
625     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
626     @Override
627     @SuppressWarnings({"checkstyle:IllegalCatch"})
628     public void instantiateServiceInstance() {
629         lazyTransactionManagerInitialization();
630
631         try {
632             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
633                     .retrieveAndClearPortStatusMessages();
634
635             portStatusMessages.forEach(this::writePortStatusMessage);
636             submitTransaction();
637         } catch (final Exception ex) {
638             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
639                     deviceInfo.toString(), ex.toString()), ex);
640         }
641
642         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
643                 .lookup(deviceInfo.getVersion());
644
645         if (initializer.isPresent()) {
646             final Future<Void> initialize = initializer
647                     .get()
648                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
649
650             try {
651                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
652             } catch (TimeoutException ex) {
653                 initialize.cancel(true);
654                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
655                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
656             } catch (ExecutionException | InterruptedException ex) {
657                 throw new RuntimeException(
658                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
659             }
660         } else {
661             throw new RuntimeException(String.format("Unsupported version %s for device %s",
662                     deviceInfo.getVersion(),
663                     deviceInfo.toString()));
664         }
665
666         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
667                 getDeviceFlowRegistry().fill();
668         Futures.addCallback(deviceFlowRegistryFill,
669                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
670                 MoreExecutors.directExecutor());
671     }
672
673     @VisibleForTesting
674     void lazyTransactionManagerInitialization() {
675         if (!this.initialized.get()) {
676             if (LOG.isDebugEnabled()) {
677                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
678             }
679             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
680             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
681                     deviceInfo.getNodeInstanceIdentifier());
682             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
683             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
684         }
685
686         transactionChainManager.activateTransactionManager();
687         initialized.set(true);
688     }
689
690     @Nullable
691     @Override
692     public <T> RequestContext<T> createRequestContext() {
693         final Long xid = deviceInfo.reserveXidForDeviceMessage();
694
695         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
696             @Override
697             public void close() {
698                 requestContexts.remove(this);
699             }
700         };
701
702         requestContexts.add(abstractRequestContext);
703         return abstractRequestContext;
704     }
705
706     @Override
707     public void onStateAcquired(final ContextChainState state) {
708         hasState.set(true);
709     }
710
711     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
712             .Optional<FlowCapableNode>>> {
713         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
714         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
715
716         DeviceFlowRegistryCallback(
717                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
718                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
719             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
720             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
721         }
722
723         @Override
724         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
725             if (LOG.isDebugEnabled()) {
726                 // Count all flows we read from datastore for debugging purposes.
727                 // This number do not always represent how many flows were actually added
728                 // to DeviceFlowRegistry, because of possible duplicates.
729                 long flowCount = Optional.ofNullable(result)
730                         .map(Collections::singleton)
731                         .orElse(Collections.emptySet())
732                         .stream()
733                         .flatMap(Collection::stream)
734                         .filter(Objects::nonNull)
735                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
736                         .filter(Objects::nonNull)
737                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
738                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
739                         .filter(Objects::nonNull)
740                         .filter(table -> Objects.nonNull(table.getFlow()))
741                         .flatMap(table -> table.getFlow().stream())
742                         .filter(Objects::nonNull)
743                         .count();
744
745                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
746             }
747             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
748                     .INITIAL_FLOW_REGISTRY_FILL);
749         }
750
751         @Override
752         public void onFailure(Throwable throwable) {
753             if (deviceFlowRegistryFill.isCancelled()) {
754                 if (LOG.isDebugEnabled()) {
755                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
756                 }
757             } else {
758                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
759                          throwable);
760             }
761             contextChainMastershipWatcher.onNotAbleToStartMastership(
762                     deviceInfo,
763                     "Was not able to fill flow registry on device",
764                     false);
765         }
766     }
767
768 }