Modernize codebase
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.FlowGroupInfoHistory;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
52 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
57 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
58 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
59 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
60 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
61 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
62 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
64 import org.opendaylight.openflowplugin.impl.device.history.FlowGroupInfoHistoryImpl;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.common.Uint32;
107 import org.slf4j.Logger;
108 import org.slf4j.LoggerFactory;
109
110 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
111     // FIXME: make this configurable and expose a different implementation at least for OSGi when this is switched off
112     private static final int FLOWGROUP_CACHE_SIZE = 10000;
113
114     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
115     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
116
117     // TODO: drain factor should be parametrized
118     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
119     // TODO: low water mark factor should be parametrized
120     private static final float LOW_WATERMARK_FACTOR = 0.75f;
121     // TODO: high water mark factor should be parametrized
122     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
123
124     // Timeout in milliseconds after what we will give up on initializing device
125     private static final int DEVICE_INIT_TIMEOUT = 9000;
126
127     // Timeout in milliseconds after what we will give up on closing transaction chain
128     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
129
130     private static final int LOW_WATERMARK = 1000;
131     private static final int HIGH_WATERMARK = 2000;
132
133     private final MultipartWriterProvider writerProvider;
134     private final HashedWheelTimer hashedWheelTimer;
135     private final DeviceState deviceState;
136     private final DataBroker dataBroker;
137     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
138     private final MessageSpy messageSpy;
139     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
140     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
141     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
142             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
143     private final TranslatorLibrary translatorLibrary;
144     private final ConvertorExecutor convertorExecutor;
145     private final DeviceInitializerProvider deviceInitializerProvider;
146     private final PacketInRateLimiter packetInLimiter;
147     private final DeviceInfo deviceInfo;
148     private final ConnectionContext primaryConnectionContext;
149     private final boolean skipTableFeatures;
150     private final boolean switchFeaturesMandatory;
151     private final boolean isFlowRemovedNotificationOn;
152     private final boolean useSingleLayerSerialization;
153     private final AtomicBoolean initialized = new AtomicBoolean(false);
154     private final AtomicBoolean hasState = new AtomicBoolean(false);
155     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
156     private final ContextChainHolder contextChainHolder;
157     private NotificationPublishService notificationPublishService;
158     private TransactionChainManager transactionChainManager;
159     private DeviceFlowRegistry deviceFlowRegistry;
160     private DeviceGroupRegistry deviceGroupRegistry;
161     private DeviceMeterRegistry deviceMeterRegistry;
162     private ExtensionConverterProvider extensionConverterProvider;
163     private ContextChainMastershipWatcher contextChainMastershipWatcher;
164     private FlowGroupInfoHistoryImpl history;
165     private final NotificationManager<String, Runnable> queuedNotificationManager;
166     private final boolean isStatisticsPollingOn;
167
168     DeviceContextImpl(@NonNull final ConnectionContext primaryConnectionContext,
169                       @NonNull final DataBroker dataBroker,
170                       @NonNull final MessageSpy messageSpy,
171                       @NonNull final TranslatorLibrary translatorLibrary,
172                       final ConvertorExecutor convertorExecutor,
173                       final boolean skipTableFeatures,
174                       final HashedWheelTimer hashedWheelTimer,
175                       final boolean useSingleLayerSerialization,
176                       final DeviceInitializerProvider deviceInitializerProvider,
177                       final boolean isFlowRemovedNotificationOn,
178                       final boolean switchFeaturesMandatory,
179                       final ContextChainHolder contextChainHolder,
180                       final NotificationManager<String, Runnable> queuedNotificationManager,
181                       final boolean isStatisticsPollingOn) {
182         this.primaryConnectionContext = primaryConnectionContext;
183         deviceInfo = primaryConnectionContext.getDeviceInfo();
184         this.hashedWheelTimer = hashedWheelTimer;
185         this.deviceInitializerProvider = deviceInitializerProvider;
186         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
187         this.switchFeaturesMandatory = switchFeaturesMandatory;
188         deviceState = new DeviceStateImpl();
189         this.dataBroker = dataBroker;
190         this.messageSpy = messageSpy;
191         this.isStatisticsPollingOn = isStatisticsPollingOn;
192         this.contextChainHolder = contextChainHolder;
193
194         packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
195                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
196
197         this.translatorLibrary = translatorLibrary;
198         portStatusTranslator = translatorLibrary.lookupTranslator(
199                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
200         packetInTranslator = translatorLibrary.lookupTranslator(
201                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
202                         .protocol.rev130731
203                         .PacketIn.class.getName()));
204         flowRemovedTranslator = translatorLibrary.lookupTranslator(
205                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
206
207         this.convertorExecutor = convertorExecutor;
208         this.skipTableFeatures = skipTableFeatures;
209         this.useSingleLayerSerialization = useSingleLayerSerialization;
210         this.queuedNotificationManager = queuedNotificationManager;
211         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
212     }
213
214     @Override
215     public boolean initialSubmitTransaction() {
216         if (!initialized.get()) {
217             return false;
218         }
219
220         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
221         isInitialTransactionSubmitted.set(initialSubmit);
222         return initialSubmit;
223     }
224
225     @Override
226     public DeviceState getDeviceState() {
227         return deviceState;
228     }
229
230     @Override
231     public ReadTransaction getReadTransaction() {
232         return dataBroker.newReadOnlyTransaction();
233     }
234
235     @Override
236     public boolean isTransactionsEnabled() {
237         return isInitialTransactionSubmitted.get();
238     }
239
240     @Override
241     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
242                                                           final InstanceIdentifier<T> path,
243                                                           final T data) {
244         if (initialized.get()) {
245             transactionChainManager.writeToTransaction(store, path, data, false);
246         }
247     }
248
249     @Override
250     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
251                                                                          final InstanceIdentifier<T> path,
252                                                                          final T data) {
253         if (initialized.get()) {
254             transactionChainManager.writeToTransaction(store, path, data, true);
255         }
256     }
257
258     @Override
259     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
260                                                           final InstanceIdentifier<T> path) {
261         if (initialized.get()) {
262             transactionChainManager.addDeleteOperationToTxChain(store, path);
263         }
264     }
265
266     @Override
267     public boolean submitTransaction() {
268         return initialized.get() && transactionChainManager.submitTransaction();
269     }
270
271     @Override
272     public boolean syncSubmitTransaction() {
273         return initialized.get() && transactionChainManager.submitTransaction(true);
274     }
275
276     @Override
277     public ConnectionContext getPrimaryConnectionContext() {
278         return primaryConnectionContext;
279     }
280
281     @Override
282     public DeviceFlowRegistry getDeviceFlowRegistry() {
283         return deviceFlowRegistry;
284     }
285
286     @Override
287     public DeviceGroupRegistry getDeviceGroupRegistry() {
288         return deviceGroupRegistry;
289     }
290
291     @Override
292     public boolean isStatisticsPollingOn() {
293         return isStatisticsPollingOn;
294     }
295
296     @Override
297     public DeviceMeterRegistry getDeviceMeterRegistry() {
298         return deviceMeterRegistry;
299     }
300
301     @Override
302     public void processReply(final OfHeader ofHeader) {
303         messageSpy.spyMessage(
304                 ofHeader.implementedInterface(),
305                 ofHeader instanceof Error
306                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
307                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
308     }
309
310     @Override
311     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
312         ofHeaderList.forEach(header -> messageSpy.spyMessage(
313                 header.implementedInterface(),
314                 header instanceof Error
315                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
316                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
317     }
318
319     @Override
320     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
321         if (isMasterOfDevice()) {
322             //1. translate to general flow (table, priority, match, cookie)
323             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
324                     .FlowRemoved flowRemovedNotification = flowRemovedTranslator
325                     .translate(flowRemoved, deviceInfo, null);
326
327             if (isFlowRemovedNotificationOn) {
328                 // Trigger off a notification
329                 notificationPublishService.offerNotification(flowRemovedNotification);
330             }
331         } else {
332             LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
333                     deviceInfo.getLOGValue());
334         }
335     }
336
337     @Override
338     @SuppressWarnings("checkstyle:IllegalCatch")
339     public void processPortStatusMessage(final PortStatusMessage portStatus) {
340         messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
341                 .FROM_SWITCH_PUBLISHED_SUCCESS);
342
343         if (initialized.get()) {
344             try {
345                 writePortStatusMessage(portStatus);
346             } catch (final Exception e) {
347                 LOG.warn("Error processing port status message for port {} on device {}",
348                         portStatus.getPortNo(), getDeviceInfo(), e);
349             }
350         } else if (!hasState.get()) {
351             primaryConnectionContext.handlePortStatusMessage(portStatus);
352         }
353     }
354
355     @SuppressWarnings("checkstyle:IllegalCatch")
356     private void writePortStatusMessage(final PortStatus portStatusMessage) {
357         String datapathId = deviceInfo.getDatapathId().toString().intern();
358         queuedNotificationManager.submitNotification(datapathId, () -> {
359             try {
360                 acquireWriteTransactionLock();
361                 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
362                         .translate(portStatusMessage, getDeviceInfo(), null);
363                 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
364                         deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
365                         portStatusMessage.getReason());
366
367                 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
368                         .getNodeInstanceIdentifier()
369                         .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
370                                 .nodeConnectorIdfromDatapathPortNo(
371                                         deviceInfo.getDatapathId(),
372                                         portStatusMessage.getPortNo(),
373                                         OpenflowVersion.get(deviceInfo.getVersion()))));
374
375                 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
376                         .withKey(iiToNodeConnector.getKey())
377                         .addAugmentation(new FlowCapableNodeConnectorStatisticsDataBuilder().build())
378                         .addAugmentation(flowCapableNodeConnector)
379                         .build());
380                 syncSubmitTransaction();
381                 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
382                     addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
383                     syncSubmitTransaction();
384                 }
385             } catch (final Exception e) {
386                 LOG.warn("Error processing port status message for port {} on device {}",
387                         portStatusMessage.getPortNo(), datapathId, e);
388             } finally {
389                 releaseWriteTransactionLock();
390             }
391         });
392     }
393
394     @Override
395     public void processPacketInMessage(final PacketInMessage packetInMessage) {
396         if (isMasterOfDevice()) {
397             final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
398             handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
399         } else {
400             LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
401         }
402     }
403
404     private Boolean isMasterOfDevice() {
405         final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
406         boolean result = false;
407         if (contextChain != null) {
408             result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
409         }
410         return result;
411     }
412
413     private void handlePacketInMessage(final PacketIn packetIn,
414                                        final Class<?> implementedInterface,
415                                        final Match match) {
416         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
417         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
418
419         if (packetIn == null) {
420             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
421             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
422             return;
423         }
424
425         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
426
427         // Try to get ingress from match
428         final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
429                 ? packetIn.getIngress() : Optional.ofNullable(match)
430                 .map(Match::getInPort)
431                 .map(nodeConnectorId -> InventoryDataServiceUtil
432                         .portNumberfromNodeConnectorId(
433                                 openflowVersion,
434                                 nodeConnectorId))
435                 .map(portNumber -> InventoryDataServiceUtil
436                         .nodeConnectorRefFromDatapathIdPortno(
437                                 deviceInfo.getDatapathId(),
438                                 portNumber,
439                                 openflowVersion))
440                 .orElse(null);
441
442         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
443
444         if (!packetInLimiter.acquirePermit()) {
445             LOG.debug("Packet limited");
446             // TODO: save packet into emergency slot if possible
447             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
448                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
449             return;
450         }
451
452         final ListenableFuture<?> offerNotification = notificationPublishService
453                 .offerNotification(new PacketReceivedBuilder(packetIn)
454                         .setIngress(nodeConnectorRef)
455                         .setMatch(MatchUtil.transformMatch(match,
456                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
457                                         .Match.class))
458                         .build());
459
460         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
461             LOG.debug("notification offer rejected");
462             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
463             packetInLimiter.drainLowWaterMark();
464             packetInLimiter.releasePermit();
465             return;
466         }
467
468         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
469             @Override
470             public void onSuccess(final Object result) {
471                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
472                 packetInLimiter.releasePermit();
473             }
474
475             @Override
476             public void onFailure(final Throwable throwable) {
477                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
478                         .FROM_SWITCH_NOTIFICATION_REJECTED);
479                 LOG.debug("notification offer failed: {}", throwable.getMessage());
480                 LOG.trace("notification offer failed..", throwable);
481                 packetInLimiter.releasePermit();
482             }
483         }, MoreExecutors.directExecutor());
484     }
485
486     @Override
487     public void processExperimenterMessage(final ExperimenterMessage notification) {
488         if (isMasterOfDevice()) {
489             // lookup converter
490             final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
491             final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
492                     getDeviceInfo().getVersion(),
493                     (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
494             final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
495                     extensionConverterProvider.getMessageConverter(key);
496             if (messageConverter == null) {
497                 LOG.warn("custom converter for {}[OF:{}] not found",
498                         notification.getExperimenterDataOfChoice().implementedInterface(),
499                         getDeviceInfo().getVersion());
500                 return;
501             }
502             // build notification
503             final ExperimenterMessageOfChoice messageOfChoice;
504             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
505             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
506                     ExperimenterMessageFromDevBuilder()
507                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
508                     .setExperimenterMessageOfChoice(messageOfChoice);
509             // publish
510             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
511         } else {
512             LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
513                     deviceInfo.getLOGValue());
514         }
515     }
516
517     @Override
518     // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
519     // recognize it.
520     @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
521     public boolean processAlienMessage(final OfHeader message) {
522         final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
523
524         if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
525                 .equals(implementedInterface)) {
526             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
527                     .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
528                 .rev130709.PacketInMessage) message;
529
530             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
531             return true;
532         }
533
534         return false;
535     }
536
537     @Override
538     public TranslatorLibrary oook() {
539         return translatorLibrary;
540     }
541
542     @Override
543     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
544         this.notificationPublishService = notificationPublishService;
545     }
546
547     @Override
548     public MessageSpy getMessageSpy() {
549         return messageSpy;
550     }
551
552     @Override
553     public void onPublished() {
554         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
555     }
556
557     @Override
558     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
559                                                                                       requestContext) {
560         return new MultiMsgCollectorImpl<>(this, requestContext);
561     }
562
563     @Override
564     public void updatePacketInRateLimit(final long upperBound) {
565         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
566                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
567     }
568
569     @Override
570     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
571         this.extensionConverterProvider = extensionConverterProvider;
572     }
573
574     @Override
575     public ExtensionConverterProvider getExtensionConverterProvider() {
576         return extensionConverterProvider;
577     }
578
579     @VisibleForTesting
580     TransactionChainManager getTransactionChainManager() {
581         return transactionChainManager;
582     }
583
584     @Override
585     public ListenableFuture<?> closeServiceInstance() {
586         final ListenableFuture<?> listenableFuture = initialized.get()
587                 ? transactionChainManager.deactivateTransactionManager()
588                 : Futures.immediateFuture(null);
589
590         hashedWheelTimer.newTimeout(timerTask -> {
591             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
592                 listenableFuture.cancel(true);
593             }
594         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
595
596         return listenableFuture;
597     }
598
599     @Override
600     public DeviceInfo getDeviceInfo() {
601         return deviceInfo;
602     }
603
604     @Override
605     public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
606         contextChainMastershipWatcher = newWatcher;
607     }
608
609     @Override
610     public ServiceGroupIdentifier getIdentifier() {
611         return deviceInfo.getServiceIdentifier();
612     }
613
614     @Override
615     public void acquireWriteTransactionLock() {
616         transactionChainManager.acquireWriteTransactionLock();
617     }
618
619     @Override
620     public void releaseWriteTransactionLock() {
621         transactionChainManager.releaseWriteTransactionLock();
622     }
623
624     @Override
625     public void close() {
626         // Close all datastore registries and transactions
627         if (initialized.getAndSet(false)) {
628             deviceGroupRegistry.close();
629             deviceFlowRegistry.close();
630             deviceMeterRegistry.close();
631
632             final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
633
634             Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
635                 @Override
636                 public void onSuccess(final Object result) {
637                     transactionChainManager.close();
638                     transactionChainManager = null;
639                 }
640
641                 @Override
642                 public void onFailure(final Throwable throwable) {
643                     transactionChainManager.close();
644                     transactionChainManager = null;
645                 }
646             }, MoreExecutors.directExecutor());
647         }
648
649         requestContexts.forEach(requestContext -> RequestContextUtil
650                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
651         requestContexts.clear();
652     }
653
654     @Override
655     public boolean canUseSingleLayerSerialization() {
656         return useSingleLayerSerialization && OFConstants.OFP_VERSION_1_3.compareTo(getDeviceInfo().getVersion()) <= 0;
657     }
658
659     @Override
660     public void instantiateServiceInstance() {
661         lazyTransactionManagerInitialization();
662     }
663
664     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
665     @Override
666     @SuppressWarnings({"checkstyle:IllegalCatch"})
667     public void initializeDevice() {
668         LOG.debug("Device initialization started for device {}", deviceInfo);
669         try {
670             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
671                     .retrieveAndClearPortStatusMessages();
672             portStatusMessages.forEach(this::writePortStatusMessage);
673             submitTransaction();
674         } catch (final Exception ex) {
675             throw new IllegalStateException(String.format("Error processing port status messages from device %s: %s",
676                     deviceInfo.toString(), ex.toString()), ex);
677         }
678
679         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
680                 .lookup(deviceInfo.getVersion());
681
682         if (initializer.isPresent()) {
683             final Future<Void> initialize = initializer.orElseThrow()
684                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
685
686             try {
687                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
688             } catch (TimeoutException ex) {
689                 initialize.cancel(true);
690                 throw new IllegalStateException(String.format("Failed to initialize device %s in %ss: %s",
691                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
692             } catch (ExecutionException | InterruptedException ex) {
693                 throw new IllegalStateException(
694                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
695             }
696         } else {
697             throw new IllegalStateException(String.format("Unsupported version %s for device %s",
698                     deviceInfo.getVersion(),
699                     deviceInfo.toString()));
700         }
701
702         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
703                 getDeviceFlowRegistry().fill();
704         Futures.addCallback(deviceFlowRegistryFill,
705                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
706                 MoreExecutors.directExecutor());
707     }
708
709     @VisibleForTesting
710     void lazyTransactionManagerInitialization() {
711         if (!initialized.get()) {
712             LOG.debug("Transaction chain manager for node {} created", deviceInfo);
713             final NodeId nodeId = deviceInfo.getNodeId();
714             transactionChainManager = new TransactionChainManager(dataBroker, nodeId.getValue());
715             history = new FlowGroupInfoHistoryImpl(FLOWGROUP_CACHE_SIZE);
716             deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
717                     deviceInfo.getNodeInstanceIdentifier(), history);
718             deviceGroupRegistry = new DeviceGroupRegistryImpl(history);
719             deviceMeterRegistry = new DeviceMeterRegistryImpl();
720         }
721
722         transactionChainManager.activateTransactionManager();
723         initialized.set(true);
724     }
725
726     @Override
727     public <T> RequestContext<T> createRequestContext() {
728         final Uint32 xid = deviceInfo.reserveXidForDeviceMessage();
729
730         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<>(xid) {
731             @Override
732             public void close() {
733                 requestContexts.remove(this);
734             }
735         };
736
737         requestContexts.add(abstractRequestContext);
738         return abstractRequestContext;
739     }
740
741     @Override
742     public void onStateAcquired(final ContextChainState state) {
743         hasState.set(true);
744     }
745
746     @Override
747     public FlowGroupInfoHistory getFlowGroupInfoHistory() {
748         return history;
749     }
750
751     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
752         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
753         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
754
755         DeviceFlowRegistryCallback(
756                 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
757                 final ContextChainMastershipWatcher contextChainMastershipWatcher) {
758             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
759             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
760         }
761
762         @Override
763         public void onSuccess(final List<Optional<FlowCapableNode>> result) {
764             if (LOG.isDebugEnabled()) {
765                 // Count all flows we read from datastore for debugging purposes.
766                 // This number do not always represent how many flows were actually added
767                 // to DeviceFlowRegistry, because of possible duplicates.
768                 long flowCount = 0;
769                 if (result != null) {
770                     for (Optional<FlowCapableNode> optNode : result) {
771                         if (optNode.isPresent()) {
772                             flowCount += optNode.orElseThrow().nonnullTable().values().stream()
773                                     .filter(Objects::nonNull)
774                                     .flatMap(table -> table.nonnullFlow().values().stream())
775                                     .filter(Objects::nonNull)
776                                     .count();
777                         }
778                     }
779                 }
780
781                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
782             }
783         }
784
785         @Override
786         public void onFailure(final Throwable throwable) {
787             if (deviceFlowRegistryFill.isCancelled()) {
788                 if (LOG.isDebugEnabled()) {
789                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
790                 }
791             } else {
792                 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
793             }
794             contextChainMastershipWatcher.onNotAbleToStartMastership(
795                     deviceInfo,
796                     "Was not able to fill flow registry on device",
797                     false);
798         }
799     }
800 }