OPNFLWPLUG-1071 : Removal of javax.annotation.Nonnnull and replacement of javax.annot...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper, DeviceInitializationContext {
110
111     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
113
114     // TODO: drain factor should be parametrized
115     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
116     // TODO: low water mark factor should be parametrized
117     private static final float LOW_WATERMARK_FACTOR = 0.75f;
118     // TODO: high water mark factor should be parametrized
119     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
120
121     // Timeout in milliseconds after what we will give up on initializing device
122     private static final int DEVICE_INIT_TIMEOUT = 9000;
123
124     // Timeout in milliseconds after what we will give up on closing transaction chain
125     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
126
127     private static final int LOW_WATERMARK = 1000;
128     private static final int HIGH_WATERMARK = 2000;
129
130     private final MultipartWriterProvider writerProvider;
131     private final HashedWheelTimer hashedWheelTimer;
132     private final DeviceState deviceState;
133     private final DataBroker dataBroker;
134     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
135     private final MessageSpy messageSpy;
136     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
137     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
138     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
139             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
140     private final TranslatorLibrary translatorLibrary;
141     private final ConvertorExecutor convertorExecutor;
142     private final DeviceInitializerProvider deviceInitializerProvider;
143     private final PacketInRateLimiter packetInLimiter;
144     private final DeviceInfo deviceInfo;
145     private final ConnectionContext primaryConnectionContext;
146     private final boolean skipTableFeatures;
147     private final boolean switchFeaturesMandatory;
148     private final boolean isFlowRemovedNotificationOn;
149     private final boolean useSingleLayerSerialization;
150     private final AtomicBoolean initialized = new AtomicBoolean(false);
151     private final AtomicBoolean hasState = new AtomicBoolean(false);
152     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
153     private final ContextChainHolder contextChainHolder;
154     private NotificationPublishService notificationPublishService;
155     private TransactionChainManager transactionChainManager;
156     private DeviceFlowRegistry deviceFlowRegistry;
157     private DeviceGroupRegistry deviceGroupRegistry;
158     private DeviceMeterRegistry deviceMeterRegistry;
159     private ExtensionConverterProvider extensionConverterProvider;
160     private ContextChainMastershipWatcher contextChainMastershipWatcher;
161     private final NotificationManager<String, Runnable> queuedNotificationManager;
162     private final boolean isStatisticsPollingOn;
163
164     DeviceContextImpl(@NonNull final ConnectionContext primaryConnectionContext,
165                       @NonNull final DataBroker dataBroker,
166                       @NonNull final MessageSpy messageSpy,
167                       @NonNull final TranslatorLibrary translatorLibrary,
168                       final ConvertorExecutor convertorExecutor,
169                       final boolean skipTableFeatures,
170                       final HashedWheelTimer hashedWheelTimer,
171                       final boolean useSingleLayerSerialization,
172                       final DeviceInitializerProvider deviceInitializerProvider,
173                       final boolean isFlowRemovedNotificationOn,
174                       final boolean switchFeaturesMandatory,
175                       final ContextChainHolder contextChainHolder,
176                       final NotificationManager queuedNotificationManager,
177                       final boolean isStatisticsPollingOn) {
178         this.primaryConnectionContext = primaryConnectionContext;
179         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
180         this.hashedWheelTimer = hashedWheelTimer;
181         this.deviceInitializerProvider = deviceInitializerProvider;
182         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
183         this.switchFeaturesMandatory = switchFeaturesMandatory;
184         this.deviceState = new DeviceStateImpl();
185         this.dataBroker = dataBroker;
186         this.messageSpy = messageSpy;
187         this.isStatisticsPollingOn = isStatisticsPollingOn;
188         this.contextChainHolder = contextChainHolder;
189
190         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
191                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
192
193         this.translatorLibrary = translatorLibrary;
194         this.portStatusTranslator = translatorLibrary.lookupTranslator(
195                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
196         this.packetInTranslator = translatorLibrary.lookupTranslator(
197                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
198                         .protocol.rev130731
199                         .PacketIn.class.getName()));
200         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
201                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
202
203         this.convertorExecutor = convertorExecutor;
204         this.skipTableFeatures = skipTableFeatures;
205         this.useSingleLayerSerialization = useSingleLayerSerialization;
206         this.queuedNotificationManager = queuedNotificationManager;
207         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
208     }
209
210     @Override
211     public boolean initialSubmitTransaction() {
212         if (!initialized.get()) {
213             return false;
214         }
215
216         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
217         isInitialTransactionSubmitted.set(initialSubmit);
218         return initialSubmit;
219     }
220
221     @Override
222     public DeviceState getDeviceState() {
223         return deviceState;
224     }
225
226     @Override
227     public ReadTransaction getReadTransaction() {
228         return dataBroker.newReadOnlyTransaction();
229     }
230
231     @Override
232     public boolean isTransactionsEnabled() {
233         return isInitialTransactionSubmitted.get();
234     }
235
236     @Override
237     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
238                                                           final InstanceIdentifier<T> path,
239                                                           final T data) {
240         if (initialized.get()) {
241             transactionChainManager.writeToTransaction(store, path, data, false);
242         }
243     }
244
245     @Override
246     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
247                                                                          final InstanceIdentifier<T> path,
248                                                                          final T data) {
249         if (initialized.get()) {
250             transactionChainManager.writeToTransaction(store, path, data, true);
251         }
252     }
253
254     @Override
255     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
256                                                           final InstanceIdentifier<T> path) {
257         if (initialized.get()) {
258             transactionChainManager.addDeleteOperationToTxChain(store, path);
259         }
260     }
261
262     @Override
263     public boolean submitTransaction() {
264         return initialized.get() && transactionChainManager.submitTransaction();
265     }
266
267     @Override
268     public boolean syncSubmitTransaction() {
269         return initialized.get() && transactionChainManager.submitTransaction(true);
270     }
271
272     @Override
273     public ConnectionContext getPrimaryConnectionContext() {
274         return primaryConnectionContext;
275     }
276
277     @Override
278     public DeviceFlowRegistry getDeviceFlowRegistry() {
279         return deviceFlowRegistry;
280     }
281
282     @Override
283     public DeviceGroupRegistry getDeviceGroupRegistry() {
284         return deviceGroupRegistry;
285     }
286
287     @Override
288     public boolean isStatisticsPollingOn() {
289         return isStatisticsPollingOn;
290     }
291
292     @Override
293     public DeviceMeterRegistry getDeviceMeterRegistry() {
294         return deviceMeterRegistry;
295     }
296
297     @Override
298     public void processReply(final OfHeader ofHeader) {
299         messageSpy.spyMessage(
300                 ofHeader.implementedInterface(),
301                 ofHeader instanceof Error
302                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
304     }
305
306     @Override
307     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
308         ofHeaderList.forEach(header -> messageSpy.spyMessage(
309                 header.implementedInterface(),
310                 header instanceof Error
311                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
312                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
313     }
314
315     @Override
316     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
317         if (isMasterOfDevice()) {
318             //1. translate to general flow (table, priority, match, cookie)
319             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
320                     .FlowRemoved flowRemovedNotification = flowRemovedTranslator
321                     .translate(flowRemoved, deviceInfo, null);
322
323             if (isFlowRemovedNotificationOn) {
324                 // Trigger off a notification
325                 notificationPublishService.offerNotification(flowRemovedNotification);
326             }
327         } else {
328             LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
329                     deviceInfo.getLOGValue());
330         }
331     }
332
333     @Override
334     @SuppressWarnings("checkstyle:IllegalCatch")
335     public void processPortStatusMessage(final PortStatusMessage portStatus) {
336         messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
337                 .FROM_SWITCH_PUBLISHED_SUCCESS);
338
339         if (initialized.get()) {
340             try {
341                 writePortStatusMessage(portStatus);
342             } catch (final Exception e) {
343                 LOG.warn("Error processing port status message for port {} on device {}",
344                         portStatus.getPortNo(), getDeviceInfo(), e);
345             }
346         } else if (!hasState.get()) {
347             primaryConnectionContext.handlePortStatusMessage(portStatus);
348         }
349     }
350
351     @SuppressWarnings("checkstyle:IllegalCatch")
352     private void writePortStatusMessage(final PortStatus portStatusMessage) {
353         String datapathId = deviceInfo.getDatapathId().toString().intern();
354         queuedNotificationManager.submitNotification(datapathId, () -> {
355             try {
356                 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
357                         .translate(portStatusMessage, getDeviceInfo(), null);
358                 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
359                         deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
360                         portStatusMessage.getReason());
361
362                 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
363                         .getNodeInstanceIdentifier()
364                         .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
365                                 .nodeConnectorIdfromDatapathPortNo(
366                                         deviceInfo.getDatapathId(),
367                                         portStatusMessage.getPortNo(),
368                                         OpenflowVersion.get(deviceInfo.getVersion()))));
369
370                 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
371                         .withKey(iiToNodeConnector.getKey())
372                         .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
373                                 FlowCapableNodeConnectorStatisticsDataBuilder().build())
374                         .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
375                         .build());
376                 syncSubmitTransaction();
377                 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
378                     addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
379                     syncSubmitTransaction();
380                 }
381             } catch (final Exception e) {
382                 LOG.warn("Error processing port status message for port {} on device {}",
383                         portStatusMessage.getPortNo(), datapathId, e);
384             }
385         });
386     }
387
388     @Override
389     public void processPacketInMessage(final PacketInMessage packetInMessage) {
390         if (isMasterOfDevice()) {
391             final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
392             handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
393         } else {
394             LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
395         }
396     }
397
398     private Boolean isMasterOfDevice() {
399         final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
400         boolean result = false;
401         if (contextChain != null) {
402             result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
403         }
404         return result;
405     }
406
407     private void handlePacketInMessage(final PacketIn packetIn,
408                                        final Class<?> implementedInterface,
409                                        final Match match) {
410         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
411         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
412
413         if (packetIn == null) {
414             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
415             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
416             return;
417         }
418
419         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
420
421         // Try to get ingress from match
422         final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
423                 ? packetIn.getIngress() : Optional.ofNullable(match)
424                 .map(Match::getInPort)
425                 .map(nodeConnectorId -> InventoryDataServiceUtil
426                         .portNumberfromNodeConnectorId(
427                                 openflowVersion,
428                                 nodeConnectorId))
429                 .map(portNumber -> InventoryDataServiceUtil
430                         .nodeConnectorRefFromDatapathIdPortno(
431                                 deviceInfo.getDatapathId(),
432                                 portNumber,
433                                 openflowVersion))
434                 .orElse(null);
435
436         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
437
438         if (!packetInLimiter.acquirePermit()) {
439             LOG.debug("Packet limited");
440             // TODO: save packet into emergency slot if possible
441             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
442                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
443             return;
444         }
445
446         final ListenableFuture<?> offerNotification = notificationPublishService
447                 .offerNotification(new PacketReceivedBuilder(packetIn)
448                         .setIngress(nodeConnectorRef)
449                         .setMatch(MatchUtil.transformMatch(match,
450                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
451                                         .Match.class))
452                         .build());
453
454         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
455             LOG.debug("notification offer rejected");
456             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
457             packetInLimiter.drainLowWaterMark();
458             packetInLimiter.releasePermit();
459             return;
460         }
461
462         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
463             @Override
464             public void onSuccess(final Object result) {
465                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
466                 packetInLimiter.releasePermit();
467             }
468
469             @Override
470             public void onFailure(final Throwable throwable) {
471                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
472                         .FROM_SWITCH_NOTIFICATION_REJECTED);
473                 LOG.debug("notification offer failed: {}", throwable.getMessage());
474                 LOG.trace("notification offer failed..", throwable);
475                 packetInLimiter.releasePermit();
476             }
477         }, MoreExecutors.directExecutor());
478     }
479
480     @Override
481     public void processExperimenterMessage(final ExperimenterMessage notification) {
482         if (isMasterOfDevice()) {
483             // lookup converter
484             final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
485             final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
486                     getDeviceInfo().getVersion(),
487                     (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
488             final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
489                     extensionConverterProvider.getMessageConverter(key);
490             if (messageConverter == null) {
491                 LOG.warn("custom converter for {}[OF:{}] not found",
492                         notification.getExperimenterDataOfChoice().implementedInterface(),
493                         getDeviceInfo().getVersion());
494                 return;
495             }
496             // build notification
497             final ExperimenterMessageOfChoice messageOfChoice;
498             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
499             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
500                     ExperimenterMessageFromDevBuilder()
501                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
502                     .setExperimenterMessageOfChoice(messageOfChoice);
503             // publish
504             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
505         } else {
506             LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
507                     deviceInfo.getLOGValue());
508         }
509     }
510
511     @Override
512     // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
513     // recognize it.
514     @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
515     public boolean processAlienMessage(final OfHeader message) {
516         final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
517
518         if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
519                 .equals(implementedInterface)) {
520             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
521                     .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
522                 .rev130709.PacketInMessage) message;
523
524             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
525             return true;
526         }
527
528         return false;
529     }
530
531     @Override
532     public TranslatorLibrary oook() {
533         return translatorLibrary;
534     }
535
536     @Override
537     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
538         this.notificationPublishService = notificationPublishService;
539     }
540
541     @Override
542     public MessageSpy getMessageSpy() {
543         return messageSpy;
544     }
545
546     @Override
547     public void onPublished() {
548         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
549     }
550
551     @Override
552     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
553                                                                                       requestContext) {
554         return new MultiMsgCollectorImpl<>(this, requestContext);
555     }
556
557     @Override
558     public void updatePacketInRateLimit(final long upperBound) {
559         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
560                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
561     }
562
563     @Override
564     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
565         this.extensionConverterProvider = extensionConverterProvider;
566     }
567
568     @Override
569     public ExtensionConverterProvider getExtensionConverterProvider() {
570         return extensionConverterProvider;
571     }
572
573     @VisibleForTesting
574     TransactionChainManager getTransactionChainManager() {
575         return this.transactionChainManager;
576     }
577
578     @Override
579     public ListenableFuture<?> closeServiceInstance() {
580         final ListenableFuture<?> listenableFuture = initialized.get()
581                 ? transactionChainManager.deactivateTransactionManager()
582                 : Futures.immediateFuture(null);
583
584         hashedWheelTimer.newTimeout((timerTask) -> {
585             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
586                 listenableFuture.cancel(true);
587             }
588         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
589
590         return listenableFuture;
591     }
592
593     @Override
594     public DeviceInfo getDeviceInfo() {
595         return this.deviceInfo;
596     }
597
598     @Override
599     public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
600         this.contextChainMastershipWatcher = newWatcher;
601     }
602
603     @NonNull
604     @Override
605     public ServiceGroupIdentifier getIdentifier() {
606         return deviceInfo.getServiceIdentifier();
607     }
608
609     @Override
610     public void close() {
611         // Close all datastore registries and transactions
612         if (initialized.getAndSet(false)) {
613             deviceGroupRegistry.close();
614             deviceFlowRegistry.close();
615             deviceMeterRegistry.close();
616
617             final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
618
619             Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
620                 @Override
621                 public void onSuccess(final Object result) {
622                     transactionChainManager.close();
623                     transactionChainManager = null;
624                 }
625
626                 @Override
627                 public void onFailure(final Throwable throwable) {
628                     transactionChainManager.close();
629                     transactionChainManager = null;
630                 }
631             }, MoreExecutors.directExecutor());
632         }
633
634         requestContexts.forEach(requestContext -> RequestContextUtil
635                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
636         requestContexts.clear();
637     }
638
639     @Override
640     public boolean canUseSingleLayerSerialization() {
641         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
642     }
643
644     @Override
645     public void instantiateServiceInstance() {
646         lazyTransactionManagerInitialization();
647     }
648
649     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
650     @Override
651     @SuppressWarnings({"checkstyle:IllegalCatch"})
652     public void initializeDevice() {
653         LOG.debug("Device initialization started for device {}", deviceInfo);
654         try {
655             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
656                     .retrieveAndClearPortStatusMessages();
657             portStatusMessages.forEach(this::writePortStatusMessage);
658             submitTransaction();
659         } catch (final Exception ex) {
660             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
661                     deviceInfo.toString(), ex.toString()), ex);
662         }
663
664         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
665                 .lookup(deviceInfo.getVersion());
666
667         if (initializer.isPresent()) {
668             final Future<Void> initialize = initializer
669                     .get()
670                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
671
672             try {
673                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
674             } catch (TimeoutException ex) {
675                 initialize.cancel(true);
676                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
677                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
678             } catch (ExecutionException | InterruptedException ex) {
679                 throw new RuntimeException(
680                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
681             }
682         } else {
683             throw new RuntimeException(String.format("Unsupported version %s for device %s",
684                     deviceInfo.getVersion(),
685                     deviceInfo.toString()));
686         }
687
688         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
689                 getDeviceFlowRegistry().fill();
690         Futures.addCallback(deviceFlowRegistryFill,
691                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
692                 MoreExecutors.directExecutor());
693     }
694
695     @VisibleForTesting
696     void lazyTransactionManagerInitialization() {
697         if (!this.initialized.get()) {
698             if (LOG.isDebugEnabled()) {
699                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
700             }
701             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
702             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
703                     deviceInfo.getNodeInstanceIdentifier());
704             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
705             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
706         }
707
708         transactionChainManager.activateTransactionManager();
709         initialized.set(true);
710     }
711
712     @Nullable
713     @Override
714     public <T> RequestContext<T> createRequestContext() {
715         final Long xid = deviceInfo.reserveXidForDeviceMessage();
716
717         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
718             @Override
719             public void close() {
720                 requestContexts.remove(this);
721             }
722         };
723
724         requestContexts.add(abstractRequestContext);
725         return abstractRequestContext;
726     }
727
728     @Override
729     public void onStateAcquired(final ContextChainState state) {
730         hasState.set(true);
731     }
732
733     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
734         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
735         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
736
737         DeviceFlowRegistryCallback(
738                 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
739                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
740             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
741             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
742         }
743
744         @Override
745         public void onSuccess(List<Optional<FlowCapableNode>> result) {
746             if (LOG.isDebugEnabled()) {
747                 // Count all flows we read from datastore for debugging purposes.
748                 // This number do not always represent how many flows were actually added
749                 // to DeviceFlowRegistry, because of possible duplicates.
750                 long flowCount = 0;
751                 if (result != null) {
752                     for (Optional<FlowCapableNode> optNode : result) {
753                         if (optNode.isPresent()) {
754                             flowCount += optNode.get().nonnullTable().stream()
755                                     .filter(Objects::nonNull)
756                                     .flatMap(table -> table.nonnullFlow().stream())
757                                     .filter(Objects::nonNull)
758                                     .count();
759                         }
760                     }
761                 }
762
763                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
764             }
765         }
766
767         @Override
768         public void onFailure(Throwable throwable) {
769             if (deviceFlowRegistryFill.isCancelled()) {
770                 if (LOG.isDebugEnabled()) {
771                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
772                 }
773             } else {
774                 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
775             }
776             contextChainMastershipWatcher.onNotAbleToStartMastership(
777                     deviceInfo,
778                     "Was not able to fill flow registry on device",
779                     false);
780         }
781     }
782
783 }