f8fc0f91e87353a64c830e5c09be3cec34c7020a
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
43 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
52 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
53 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
54 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
55 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
59 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
60 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
62 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
63 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
65 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
66 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
67 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
68 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
69 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
72 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
73 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
74 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
75 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
76 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper, DeviceInitializationContext {
110
111     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
113
114     // TODO: drain factor should be parametrized
115     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
116     // TODO: low water mark factor should be parametrized
117     private static final float LOW_WATERMARK_FACTOR = 0.75f;
118     // TODO: high water mark factor should be parametrized
119     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
120
121     // Timeout in milliseconds after what we will give up on initializing device
122     private static final int DEVICE_INIT_TIMEOUT = 9000;
123
124     // Timeout in milliseconds after what we will give up on closing transaction chain
125     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
126
127     private static final int LOW_WATERMARK = 1000;
128     private static final int HIGH_WATERMARK = 2000;
129
130     private final MultipartWriterProvider writerProvider;
131     private final HashedWheelTimer hashedWheelTimer;
132     private final DeviceState deviceState;
133     private final DataBroker dataBroker;
134     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
135     private final MessageSpy messageSpy;
136     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
137     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
138     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
139             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
140     private final TranslatorLibrary translatorLibrary;
141     private final ConvertorExecutor convertorExecutor;
142     private final DeviceInitializerProvider deviceInitializerProvider;
143     private final PacketInRateLimiter packetInLimiter;
144     private final DeviceInfo deviceInfo;
145     private final ConnectionContext primaryConnectionContext;
146     private final boolean skipTableFeatures;
147     private final boolean switchFeaturesMandatory;
148     private final boolean isFlowRemovedNotificationOn;
149     private final boolean useSingleLayerSerialization;
150     private final AtomicBoolean initialized = new AtomicBoolean(false);
151     private final AtomicBoolean hasState = new AtomicBoolean(false);
152     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
153     private final ContextChainHolder contextChainHolder;
154     private NotificationPublishService notificationPublishService;
155     private TransactionChainManager transactionChainManager;
156     private DeviceFlowRegistry deviceFlowRegistry;
157     private DeviceGroupRegistry deviceGroupRegistry;
158     private DeviceMeterRegistry deviceMeterRegistry;
159     private ExtensionConverterProvider extensionConverterProvider;
160     private ContextChainMastershipWatcher contextChainMastershipWatcher;
161
162     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
163                       @Nonnull final DataBroker dataBroker,
164                       @Nonnull final MessageSpy messageSpy,
165                       @Nonnull final TranslatorLibrary translatorLibrary,
166                       final ConvertorExecutor convertorExecutor,
167                       final boolean skipTableFeatures,
168                       final HashedWheelTimer hashedWheelTimer,
169                       final boolean useSingleLayerSerialization,
170                       final DeviceInitializerProvider deviceInitializerProvider,
171                       final boolean isFlowRemovedNotificationOn,
172                       final boolean switchFeaturesMandatory,
173                       final ContextChainHolder contextChainHolder) {
174
175         this.primaryConnectionContext = primaryConnectionContext;
176         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
177         this.hashedWheelTimer = hashedWheelTimer;
178         this.deviceInitializerProvider = deviceInitializerProvider;
179         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
180         this.switchFeaturesMandatory = switchFeaturesMandatory;
181         this.deviceState = new DeviceStateImpl();
182         this.dataBroker = dataBroker;
183         this.messageSpy = messageSpy;
184         this.contextChainHolder = contextChainHolder;
185
186         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
187                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
188
189         this.translatorLibrary = translatorLibrary;
190         this.portStatusTranslator = translatorLibrary.lookupTranslator(
191                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
192         this.packetInTranslator = translatorLibrary.lookupTranslator(
193                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
194                         .protocol.rev130731
195                         .PacketIn.class.getName()));
196         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
197                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
198
199         this.convertorExecutor = convertorExecutor;
200         this.skipTableFeatures = skipTableFeatures;
201         this.useSingleLayerSerialization = useSingleLayerSerialization;
202         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
203     }
204
205     @Override
206     public boolean initialSubmitTransaction() {
207         if (!initialized.get()) {
208             return false;
209         }
210
211         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
212         isInitialTransactionSubmitted.set(initialSubmit);
213         return initialSubmit;
214     }
215
216     @Override
217     public DeviceState getDeviceState() {
218         return deviceState;
219     }
220
221     @Override
222     public ReadTransaction getReadTransaction() {
223         return dataBroker.newReadOnlyTransaction();
224     }
225
226     @Override
227     public boolean isTransactionsEnabled() {
228         return isInitialTransactionSubmitted.get();
229     }
230
231     @Override
232     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
233                                                           final InstanceIdentifier<T> path,
234                                                           final T data) {
235         if (initialized.get()) {
236             transactionChainManager.writeToTransaction(store, path, data, false);
237         }
238     }
239
240     @Override
241     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
242                                                                          final InstanceIdentifier<T> path,
243                                                                          final T data) {
244         if (initialized.get()) {
245             transactionChainManager.writeToTransaction(store, path, data, true);
246         }
247     }
248
249     @Override
250     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
251                                                           final InstanceIdentifier<T> path) {
252         if (initialized.get()) {
253             transactionChainManager.addDeleteOperationToTxChain(store, path);
254         }
255     }
256
257     @Override
258     public boolean submitTransaction() {
259         return initialized.get() && transactionChainManager.submitTransaction();
260     }
261
262     @Override
263     public boolean syncSubmitTransaction() {
264         return initialized.get() && transactionChainManager.submitTransaction(true);
265     }
266
267     @Override
268     public ConnectionContext getPrimaryConnectionContext() {
269         return primaryConnectionContext;
270     }
271
272     @Override
273     public DeviceFlowRegistry getDeviceFlowRegistry() {
274         return deviceFlowRegistry;
275     }
276
277     @Override
278     public DeviceGroupRegistry getDeviceGroupRegistry() {
279         return deviceGroupRegistry;
280     }
281
282     @Override
283     public DeviceMeterRegistry getDeviceMeterRegistry() {
284         return deviceMeterRegistry;
285     }
286
287     @Override
288     public void processReply(final OfHeader ofHeader) {
289         messageSpy.spyMessage(
290                 ofHeader.implementedInterface(),
291                 ofHeader instanceof Error
292                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
293                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
294     }
295
296     @Override
297     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
298         ofHeaderList.forEach(header -> messageSpy.spyMessage(
299                 header.implementedInterface(),
300                 header instanceof Error
301                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
302                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
303     }
304
305     @Override
306     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
307         if (isMasterOfDevice()) {
308             //1. translate to general flow (table, priority, match, cookie)
309             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
310                     .FlowRemoved flowRemovedNotification = flowRemovedTranslator
311                     .translate(flowRemoved, deviceInfo, null);
312
313             if (isFlowRemovedNotificationOn) {
314                 // Trigger off a notification
315                 notificationPublishService.offerNotification(flowRemovedNotification);
316             }
317         } else {
318             LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
319                     deviceInfo.getLOGValue());
320         }
321     }
322
323     @Override
324     @SuppressWarnings("checkstyle:IllegalCatch")
325     public void processPortStatusMessage(final PortStatusMessage portStatus) {
326         messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
327                 .FROM_SWITCH_PUBLISHED_SUCCESS);
328
329         if (initialized.get()) {
330             try {
331                 writePortStatusMessage(portStatus);
332             } catch (final Exception e) {
333                 LOG.warn("Error processing port status message for port {} on device {}",
334                         portStatus.getPortNo(), getDeviceInfo(), e);
335             }
336         } else if (!hasState.get()) {
337             primaryConnectionContext.handlePortStatusMessage(portStatus);
338         }
339     }
340
341     private void writePortStatusMessage(final PortStatus portStatusMessage) {
342         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
343                 .translate(portStatusMessage, getDeviceInfo(), null);
344         OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
345                 deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
346                 portStatusMessage.getReason());
347
348         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
349                 .getNodeInstanceIdentifier()
350                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
351                         .nodeConnectorIdfromDatapathPortNo(
352                                 deviceInfo.getDatapathId(),
353                                 portStatusMessage.getPortNo(),
354                                 OpenflowVersion.get(deviceInfo.getVersion()))));
355
356         writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
357                 .withKey(iiToNodeConnector.getKey())
358                 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
359                         FlowCapableNodeConnectorStatisticsDataBuilder().build())
360                 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
361                 .build());
362         syncSubmitTransaction();
363         if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
364             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
365             syncSubmitTransaction();
366         }
367     }
368
369     @Override
370     public void processPacketInMessage(final PacketInMessage packetInMessage) {
371         if (isMasterOfDevice()) {
372             final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
373             handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
374         } else {
375             LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
376         }
377     }
378
379     private Boolean isMasterOfDevice() {
380         final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
381         boolean result = false;
382         if (contextChain != null) {
383             result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
384         }
385         return result;
386     }
387
388     private void handlePacketInMessage(final PacketIn packetIn,
389                                        final Class<?> implementedInterface,
390                                        final Match match) {
391         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
392         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
393
394         if (packetIn == null) {
395             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
396             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
397             return;
398         }
399
400         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
401
402         // Try to get ingress from match
403         final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
404                 ? packetIn.getIngress() : Optional.ofNullable(match)
405                 .map(Match::getInPort)
406                 .map(nodeConnectorId -> InventoryDataServiceUtil
407                         .portNumberfromNodeConnectorId(
408                                 openflowVersion,
409                                 nodeConnectorId))
410                 .map(portNumber -> InventoryDataServiceUtil
411                         .nodeConnectorRefFromDatapathIdPortno(
412                                 deviceInfo.getDatapathId(),
413                                 portNumber,
414                                 openflowVersion))
415                 .orElse(null);
416
417         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
418
419         if (!packetInLimiter.acquirePermit()) {
420             LOG.debug("Packet limited");
421             // TODO: save packet into emergency slot if possible
422             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
423                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
424             return;
425         }
426
427         final ListenableFuture<?> offerNotification = notificationPublishService
428                 .offerNotification(new PacketReceivedBuilder(packetIn)
429                         .setIngress(nodeConnectorRef)
430                         .setMatch(MatchUtil.transformMatch(match,
431                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
432                                         .Match.class))
433                         .build());
434
435         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
436             LOG.debug("notification offer rejected");
437             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
438             packetInLimiter.drainLowWaterMark();
439             packetInLimiter.releasePermit();
440             return;
441         }
442
443         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
444             @Override
445             public void onSuccess(final Object result) {
446                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
447                 packetInLimiter.releasePermit();
448             }
449
450             @Override
451             public void onFailure(final Throwable throwable) {
452                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
453                         .FROM_SWITCH_NOTIFICATION_REJECTED);
454                 LOG.debug("notification offer failed: {}", throwable.getMessage());
455                 LOG.trace("notification offer failed..", throwable);
456                 packetInLimiter.releasePermit();
457             }
458         }, MoreExecutors.directExecutor());
459     }
460
461     @Override
462     public void processExperimenterMessage(final ExperimenterMessage notification) {
463         if (isMasterOfDevice()) {
464             // lookup converter
465             final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
466             final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
467                     getDeviceInfo().getVersion(),
468                     (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
469             final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
470                     extensionConverterProvider.getMessageConverter(key);
471             if (messageConverter == null) {
472                 LOG.warn("custom converter for {}[OF:{}] not found",
473                         notification.getExperimenterDataOfChoice().implementedInterface(),
474                         getDeviceInfo().getVersion());
475                 return;
476             }
477             // build notification
478             final ExperimenterMessageOfChoice messageOfChoice;
479             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
480             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
481                     ExperimenterMessageFromDevBuilder()
482                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
483                     .setExperimenterMessageOfChoice(messageOfChoice);
484             // publish
485             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
486         } else {
487             LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
488                     deviceInfo.getLOGValue());
489         }
490     }
491
492     @Override
493     // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
494     // recognize it.
495     @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
496     public boolean processAlienMessage(final OfHeader message) {
497         final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
498
499         if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
500                 .equals(implementedInterface)) {
501             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
502                     .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503                 .rev130709.PacketInMessage) message;
504
505             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
506             return true;
507         }
508
509         return false;
510     }
511
512     @Override
513     public TranslatorLibrary oook() {
514         return translatorLibrary;
515     }
516
517     @Override
518     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
519         this.notificationPublishService = notificationPublishService;
520     }
521
522     @Override
523     public MessageSpy getMessageSpy() {
524         return messageSpy;
525     }
526
527     @Override
528     public void onPublished() {
529         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
530     }
531
532     @Override
533     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
534                                                                                       requestContext) {
535         return new MultiMsgCollectorImpl<>(this, requestContext);
536     }
537
538     @Override
539     public void updatePacketInRateLimit(final long upperBound) {
540         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
541                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
542     }
543
544     @Override
545     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
546         this.extensionConverterProvider = extensionConverterProvider;
547     }
548
549     @Override
550     public ExtensionConverterProvider getExtensionConverterProvider() {
551         return extensionConverterProvider;
552     }
553
554     @VisibleForTesting
555     TransactionChainManager getTransactionChainManager() {
556         return this.transactionChainManager;
557     }
558
559     @Override
560     public ListenableFuture<?> closeServiceInstance() {
561         final ListenableFuture<?> listenableFuture = initialized.get()
562                 ? transactionChainManager.deactivateTransactionManager()
563                 : Futures.immediateFuture(null);
564
565         hashedWheelTimer.newTimeout((timerTask) -> {
566             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
567                 listenableFuture.cancel(true);
568             }
569         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
570
571         return listenableFuture;
572     }
573
574     @Override
575     public DeviceInfo getDeviceInfo() {
576         return this.deviceInfo;
577     }
578
579     @Override
580     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
581         this.contextChainMastershipWatcher = newWatcher;
582     }
583
584     @Nonnull
585     @Override
586     public ServiceGroupIdentifier getIdentifier() {
587         return deviceInfo.getServiceIdentifier();
588     }
589
590     @Override
591     public void close() {
592         // Close all datastore registries and transactions
593         if (initialized.getAndSet(false)) {
594             deviceGroupRegistry.close();
595             deviceFlowRegistry.close();
596             deviceMeterRegistry.close();
597
598             final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
599
600             Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
601                 @Override
602                 public void onSuccess(final Object result) {
603                     transactionChainManager.close();
604                     transactionChainManager = null;
605                 }
606
607                 @Override
608                 public void onFailure(final Throwable throwable) {
609                     transactionChainManager.close();
610                     transactionChainManager = null;
611                 }
612             }, MoreExecutors.directExecutor());
613         }
614
615         requestContexts.forEach(requestContext -> RequestContextUtil
616                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
617         requestContexts.clear();
618     }
619
620     @Override
621     public boolean canUseSingleLayerSerialization() {
622         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
623     }
624
625     @Override
626     public void instantiateServiceInstance() {
627         lazyTransactionManagerInitialization();
628     }
629
630     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
631     @Override
632     @SuppressWarnings({"checkstyle:IllegalCatch"})
633     public void initializeDevice() {
634         LOG.debug("Device initialization started for device {}", deviceInfo);
635         try {
636             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
637                     .retrieveAndClearPortStatusMessages();
638
639             portStatusMessages.forEach(this::writePortStatusMessage);
640             submitTransaction();
641         } catch (final Exception ex) {
642             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
643                     deviceInfo.toString(), ex.toString()), ex);
644         }
645
646         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
647                 .lookup(deviceInfo.getVersion());
648
649         if (initializer.isPresent()) {
650             final Future<Void> initialize = initializer
651                     .get()
652                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
653
654             try {
655                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
656             } catch (TimeoutException ex) {
657                 initialize.cancel(true);
658                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
659                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
660             } catch (ExecutionException | InterruptedException ex) {
661                 throw new RuntimeException(
662                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
663             }
664         } else {
665             throw new RuntimeException(String.format("Unsupported version %s for device %s",
666                     deviceInfo.getVersion(),
667                     deviceInfo.toString()));
668         }
669
670         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
671                 getDeviceFlowRegistry().fill();
672         Futures.addCallback(deviceFlowRegistryFill,
673                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
674                 MoreExecutors.directExecutor());
675     }
676
677     @VisibleForTesting
678     void lazyTransactionManagerInitialization() {
679         if (!this.initialized.get()) {
680             if (LOG.isDebugEnabled()) {
681                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
682             }
683             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
684             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
685                     deviceInfo.getNodeInstanceIdentifier());
686             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
687             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
688         }
689
690         transactionChainManager.activateTransactionManager();
691         initialized.set(true);
692     }
693
694     @Nullable
695     @Override
696     public <T> RequestContext<T> createRequestContext() {
697         final Long xid = deviceInfo.reserveXidForDeviceMessage();
698
699         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
700             @Override
701             public void close() {
702                 requestContexts.remove(this);
703             }
704         };
705
706         requestContexts.add(abstractRequestContext);
707         return abstractRequestContext;
708     }
709
710     @Override
711     public void onStateAcquired(final ContextChainState state) {
712         hasState.set(true);
713     }
714
715     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
716         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
717         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
718
719         DeviceFlowRegistryCallback(
720                 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
721                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
722             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
723             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
724         }
725
726         @Override
727         public void onSuccess(List<Optional<FlowCapableNode>> result) {
728             if (LOG.isDebugEnabled()) {
729                 // Count all flows we read from datastore for debugging purposes.
730                 // This number do not always represent how many flows were actually added
731                 // to DeviceFlowRegistry, because of possible duplicates.
732                 long flowCount = Optional.ofNullable(result)
733                         .map(Collections::singleton)
734                         .orElse(Collections.emptySet())
735                         .stream()
736                         .flatMap(Collection::stream)
737                         .filter(Objects::nonNull)
738                         .flatMap(flowCapableNodeOptional
739                             -> com.google.common.base.Optional.fromJavaUtil(flowCapableNodeOptional).asSet().stream())
740                         .filter(Objects::nonNull)
741                         .filter(flowCapableNode -> flowCapableNode.getTable() != null)
742                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
743                         .filter(Objects::nonNull)
744                         .filter(table -> table.getFlow() != null)
745                         .flatMap(table -> table.getFlow().stream())
746                         .filter(Objects::nonNull)
747                         .count();
748
749                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
750             }
751         }
752
753         @Override
754         public void onFailure(Throwable throwable) {
755             if (deviceFlowRegistryFill.isCancelled()) {
756                 if (LOG.isDebugEnabled()) {
757                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
758                 }
759             } else {
760                 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
761             }
762             contextChainMastershipWatcher.onNotAbleToStartMastership(
763                     deviceInfo,
764                     "Was not able to fill flow registry on device",
765                     false);
766         }
767     }
768
769 }