Bump odlparent to 5.0.0
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
35 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
36 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
42 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
43 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
44 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
45 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
46 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.yang.binding.DataContainer;
102 import org.opendaylight.yangtools.yang.binding.DataObject;
103 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
104 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
107
108 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
109
110     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
111
112     // TODO: drain factor should be parametrized
113     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
114     // TODO: low water mark factor should be parametrized
115     private static final float LOW_WATERMARK_FACTOR = 0.75f;
116     // TODO: high water mark factor should be parametrized
117     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
118
119     // Timeout in milliseconds after what we will give up on initializing device
120     private static final int DEVICE_INIT_TIMEOUT = 9000;
121
122     // Timeout in milliseconds after what we will give up on closing transaction chain
123     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
124
125     private static final int LOW_WATERMARK = 1000;
126     private static final int HIGH_WATERMARK = 2000;
127
128     private final MultipartWriterProvider writerProvider;
129     private final HashedWheelTimer hashedWheelTimer;
130     private final DeviceState deviceState;
131     private final DataBroker dataBroker;
132     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
133     private final MessageSpy messageSpy;
134     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
135     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
136     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
137             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
138     private final TranslatorLibrary translatorLibrary;
139     private final ConvertorExecutor convertorExecutor;
140     private final DeviceInitializerProvider deviceInitializerProvider;
141     private final PacketInRateLimiter packetInLimiter;
142     private final DeviceInfo deviceInfo;
143     private final ConnectionContext primaryConnectionContext;
144     private final boolean skipTableFeatures;
145     private final boolean switchFeaturesMandatory;
146     private final boolean isFlowRemovedNotificationOn;
147     private final boolean useSingleLayerSerialization;
148     private final AtomicBoolean initialized = new AtomicBoolean(false);
149     private final AtomicBoolean hasState = new AtomicBoolean(false);
150     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
151     private final ContextChainHolder contextChainHolder;
152     private NotificationPublishService notificationPublishService;
153     private TransactionChainManager transactionChainManager;
154     private DeviceFlowRegistry deviceFlowRegistry;
155     private DeviceGroupRegistry deviceGroupRegistry;
156     private DeviceMeterRegistry deviceMeterRegistry;
157     private ExtensionConverterProvider extensionConverterProvider;
158     private ContextChainMastershipWatcher contextChainMastershipWatcher;
159
160     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
161                       @Nonnull final DataBroker dataBroker,
162                       @Nonnull final MessageSpy messageSpy,
163                       @Nonnull final TranslatorLibrary translatorLibrary,
164                       final ConvertorExecutor convertorExecutor,
165                       final boolean skipTableFeatures,
166                       final HashedWheelTimer hashedWheelTimer,
167                       final boolean useSingleLayerSerialization,
168                       final DeviceInitializerProvider deviceInitializerProvider,
169                       final boolean isFlowRemovedNotificationOn,
170                       final boolean switchFeaturesMandatory,
171                       final ContextChainHolder contextChainHolder) {
172
173         this.primaryConnectionContext = primaryConnectionContext;
174         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
175         this.hashedWheelTimer = hashedWheelTimer;
176         this.deviceInitializerProvider = deviceInitializerProvider;
177         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
178         this.switchFeaturesMandatory = switchFeaturesMandatory;
179         this.deviceState = new DeviceStateImpl();
180         this.dataBroker = dataBroker;
181         this.messageSpy = messageSpy;
182         this.contextChainHolder = contextChainHolder;
183
184         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
185                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
186
187         this.translatorLibrary = translatorLibrary;
188         this.portStatusTranslator = translatorLibrary.lookupTranslator(
189                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
190         this.packetInTranslator = translatorLibrary.lookupTranslator(
191                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
192                         .protocol.rev130731
193                         .PacketIn.class.getName()));
194         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
195                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
196
197         this.convertorExecutor = convertorExecutor;
198         this.skipTableFeatures = skipTableFeatures;
199         this.useSingleLayerSerialization = useSingleLayerSerialization;
200         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
201     }
202
203     @Override
204     public boolean initialSubmitTransaction() {
205         if (!initialized.get()) {
206             return false;
207         }
208
209         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
210         isInitialTransactionSubmitted.set(initialSubmit);
211         return initialSubmit;
212     }
213
214     @Override
215     public DeviceState getDeviceState() {
216         return deviceState;
217     }
218
219     @Override
220     public ReadTransaction getReadTransaction() {
221         return dataBroker.newReadOnlyTransaction();
222     }
223
224     @Override
225     public boolean isTransactionsEnabled() {
226         return isInitialTransactionSubmitted.get();
227     }
228
229     @Override
230     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
231                                                           final InstanceIdentifier<T> path,
232                                                           final T data) {
233         if (initialized.get()) {
234             transactionChainManager.writeToTransaction(store, path, data, false);
235         }
236     }
237
238     @Override
239     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
240                                                                          final InstanceIdentifier<T> path,
241                                                                          final T data) {
242         if (initialized.get()) {
243             transactionChainManager.writeToTransaction(store, path, data, true);
244         }
245     }
246
247     @Override
248     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
249                                                           final InstanceIdentifier<T> path) {
250         if (initialized.get()) {
251             transactionChainManager.addDeleteOperationToTxChain(store, path);
252         }
253     }
254
255     @Override
256     public boolean submitTransaction() {
257         return initialized.get() && transactionChainManager.submitTransaction();
258     }
259
260     @Override
261     public boolean syncSubmitTransaction() {
262         return initialized.get() && transactionChainManager.submitTransaction(true);
263     }
264
265     @Override
266     public ConnectionContext getPrimaryConnectionContext() {
267         return primaryConnectionContext;
268     }
269
270     @Override
271     public DeviceFlowRegistry getDeviceFlowRegistry() {
272         return deviceFlowRegistry;
273     }
274
275     @Override
276     public DeviceGroupRegistry getDeviceGroupRegistry() {
277         return deviceGroupRegistry;
278     }
279
280     @Override
281     public DeviceMeterRegistry getDeviceMeterRegistry() {
282         return deviceMeterRegistry;
283     }
284
285     @Override
286     public void processReply(final OfHeader ofHeader) {
287         messageSpy.spyMessage(
288                 ofHeader.getImplementedInterface(),
289                 ofHeader instanceof Error
290                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
291                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
292     }
293
294     @Override
295     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
296         ofHeaderList.forEach(header -> messageSpy.spyMessage(
297                 header.getImplementedInterface(),
298                 header instanceof Error
299                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
300                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
301     }
302
303     @Override
304     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
305         if (isMasterOfDevice()) {
306             //1. translate to general flow (table, priority, match, cookie)
307             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
308                     .FlowRemoved flowRemovedNotification = flowRemovedTranslator
309                     .translate(flowRemoved, deviceInfo, null);
310
311             if (isFlowRemovedNotificationOn) {
312                 // Trigger off a notification
313                 notificationPublishService.offerNotification(flowRemovedNotification);
314             }
315         } else {
316             LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
317                     deviceInfo.getLOGValue());
318         }
319     }
320
321     @Override
322     @SuppressWarnings("checkstyle:IllegalCatch")
323     public void processPortStatusMessage(final PortStatusMessage portStatus) {
324         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
325                 .FROM_SWITCH_PUBLISHED_SUCCESS);
326
327         if (initialized.get()) {
328             try {
329                 writePortStatusMessage(portStatus);
330             } catch (final Exception e) {
331                 LOG.warn("Error processing port status message for port {} on device {}",
332                         portStatus.getPortNo(), getDeviceInfo(), e);
333             }
334         } else if (!hasState.get()) {
335             primaryConnectionContext.handlePortStatusMessage(portStatus);
336         }
337     }
338
339     private void writePortStatusMessage(final PortStatus portStatusMessage) {
340         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
341                 .translate(portStatusMessage, getDeviceInfo(), null);
342
343         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
344                 .getNodeInstanceIdentifier()
345                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
346                         .nodeConnectorIdfromDatapathPortNo(
347                                 deviceInfo.getDatapathId(),
348                                 portStatusMessage.getPortNo(),
349                                 OpenflowVersion.get(deviceInfo.getVersion()))));
350
351         writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
352                 .withKey(iiToNodeConnector.getKey())
353                 .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
354                         FlowCapableNodeConnectorStatisticsDataBuilder().build())
355                 .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
356                 .build());
357         syncSubmitTransaction();
358         if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
359             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
360             syncSubmitTransaction();
361         }
362     }
363
364     @Override
365     public void processPacketInMessage(final PacketInMessage packetInMessage) {
366         if (isMasterOfDevice()) {
367             final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
368             handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
369         } else {
370             LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
371         }
372     }
373
374     private Boolean isMasterOfDevice() {
375         final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
376         boolean result = false;
377         if (contextChain != null) {
378             result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
379         }
380         return result;
381     }
382
383     private void handlePacketInMessage(final PacketIn packetIn,
384                                        final Class<?> implementedInterface,
385                                        final Match match) {
386         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
387         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
388
389         if (packetIn == null) {
390             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
391             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
392             return;
393         }
394
395         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
396
397         // Try to get ingress from match
398         final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
399                 ? packetIn.getIngress() : Optional.ofNullable(match)
400                 .map(Match::getInPort)
401                 .map(nodeConnectorId -> InventoryDataServiceUtil
402                         .portNumberfromNodeConnectorId(
403                                 openflowVersion,
404                                 nodeConnectorId))
405                 .map(portNumber -> InventoryDataServiceUtil
406                         .nodeConnectorRefFromDatapathIdPortno(
407                                 deviceInfo.getDatapathId(),
408                                 portNumber,
409                                 openflowVersion))
410                 .orElse(null);
411
412         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
413
414         if (!packetInLimiter.acquirePermit()) {
415             LOG.debug("Packet limited");
416             // TODO: save packet into emergency slot if possible
417             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
418                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
419             return;
420         }
421
422         final ListenableFuture<?> offerNotification = notificationPublishService
423                 .offerNotification(new PacketReceivedBuilder(packetIn)
424                         .setIngress(nodeConnectorRef)
425                         .setMatch(MatchUtil.transformMatch(match,
426                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
427                                         .Match.class))
428                         .build());
429
430         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
431             LOG.debug("notification offer rejected");
432             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
433             packetInLimiter.drainLowWaterMark();
434             packetInLimiter.releasePermit();
435             return;
436         }
437
438         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
439             @Override
440             public void onSuccess(final Object result) {
441                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
442                 packetInLimiter.releasePermit();
443             }
444
445             @Override
446             public void onFailure(final Throwable throwable) {
447                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
448                         .FROM_SWITCH_NOTIFICATION_REJECTED);
449                 LOG.debug("notification offer failed: {}", throwable.getMessage());
450                 LOG.trace("notification offer failed..", throwable);
451                 packetInLimiter.releasePermit();
452             }
453         }, MoreExecutors.directExecutor());
454     }
455
456     @Override
457     public void processExperimenterMessage(final ExperimenterMessage notification) {
458         if (isMasterOfDevice()) {
459             // lookup converter
460             final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
461             final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
462                     getDeviceInfo().getVersion(),
463                     (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
464             final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
465                     extensionConverterProvider.getMessageConverter(key);
466             if (messageConverter == null) {
467                 LOG.warn("custom converter for {}[OF:{}] not found",
468                         notification.getExperimenterDataOfChoice().getImplementedInterface(),
469                         getDeviceInfo().getVersion());
470                 return;
471             }
472             // build notification
473             final ExperimenterMessageOfChoice messageOfChoice;
474             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
475             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
476                     ExperimenterMessageFromDevBuilder()
477                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
478                     .setExperimenterMessageOfChoice(messageOfChoice);
479             // publish
480             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
481         } else {
482             LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
483                     deviceInfo.getLOGValue());
484         }
485     }
486
487     @Override
488     // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
489     // recognize it.
490     @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
491     public boolean processAlienMessage(final OfHeader message) {
492         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
493
494         if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
495                 .equals(implementedInterface)) {
496             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
497                     .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
498                 .rev130709.PacketInMessage) message;
499
500             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
501             return true;
502         }
503
504         return false;
505     }
506
507     @Override
508     public TranslatorLibrary oook() {
509         return translatorLibrary;
510     }
511
512     @Override
513     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
514         this.notificationPublishService = notificationPublishService;
515     }
516
517     @Override
518     public MessageSpy getMessageSpy() {
519         return messageSpy;
520     }
521
522     @Override
523     public void onPublished() {
524         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
525     }
526
527     @Override
528     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
529                                                                                       requestContext) {
530         return new MultiMsgCollectorImpl<>(this, requestContext);
531     }
532
533     @Override
534     public void updatePacketInRateLimit(final long upperBound) {
535         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
536                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
537     }
538
539     @Override
540     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
541         this.extensionConverterProvider = extensionConverterProvider;
542     }
543
544     @Override
545     public ExtensionConverterProvider getExtensionConverterProvider() {
546         return extensionConverterProvider;
547     }
548
549     @VisibleForTesting
550     TransactionChainManager getTransactionChainManager() {
551         return this.transactionChainManager;
552     }
553
554     @Override
555     public ListenableFuture<?> closeServiceInstance() {
556         final ListenableFuture<?> listenableFuture = initialized.get()
557                 ? transactionChainManager.deactivateTransactionManager()
558                 : Futures.immediateFuture(null);
559
560         hashedWheelTimer.newTimeout((timerTask) -> {
561             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
562                 listenableFuture.cancel(true);
563             }
564         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
565
566         return listenableFuture;
567     }
568
569     @Override
570     public DeviceInfo getDeviceInfo() {
571         return this.deviceInfo;
572     }
573
574     @Override
575     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
576         this.contextChainMastershipWatcher = newWatcher;
577     }
578
579     @Nonnull
580     @Override
581     public ServiceGroupIdentifier getIdentifier() {
582         return deviceInfo.getServiceIdentifier();
583     }
584
585     @Override
586     public void close() {
587         // Close all datastore registries and transactions
588         if (initialized.getAndSet(false)) {
589             deviceGroupRegistry.close();
590             deviceFlowRegistry.close();
591             deviceMeterRegistry.close();
592
593             final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
594
595             Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
596                 @Override
597                 public void onSuccess(final Object result) {
598                     transactionChainManager.close();
599                     transactionChainManager = null;
600                 }
601
602                 @Override
603                 public void onFailure(final Throwable throwable) {
604                     transactionChainManager.close();
605                     transactionChainManager = null;
606                 }
607             }, MoreExecutors.directExecutor());
608         }
609
610         requestContexts.forEach(requestContext -> RequestContextUtil
611                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
612         requestContexts.clear();
613     }
614
615     @Override
616     public boolean canUseSingleLayerSerialization() {
617         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
618     }
619
620     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
621     @Override
622     @SuppressWarnings({"checkstyle:IllegalCatch"})
623     public void instantiateServiceInstance() {
624         lazyTransactionManagerInitialization();
625
626         try {
627             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
628                     .retrieveAndClearPortStatusMessages();
629
630             portStatusMessages.forEach(this::writePortStatusMessage);
631             submitTransaction();
632         } catch (final Exception ex) {
633             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
634                     deviceInfo.toString(), ex.toString()), ex);
635         }
636
637         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
638                 .lookup(deviceInfo.getVersion());
639
640         if (initializer.isPresent()) {
641             final Future<Void> initialize = initializer
642                     .get()
643                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
644
645             try {
646                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
647             } catch (TimeoutException ex) {
648                 initialize.cancel(true);
649                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
650                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
651             } catch (ExecutionException | InterruptedException ex) {
652                 throw new RuntimeException(
653                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
654             }
655         } else {
656             throw new RuntimeException(String.format("Unsupported version %s for device %s",
657                     deviceInfo.getVersion(),
658                     deviceInfo.toString()));
659         }
660
661         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
662                 getDeviceFlowRegistry().fill();
663         Futures.addCallback(deviceFlowRegistryFill,
664                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
665                 MoreExecutors.directExecutor());
666     }
667
668     @VisibleForTesting
669     void lazyTransactionManagerInitialization() {
670         if (!this.initialized.get()) {
671             if (LOG.isDebugEnabled()) {
672                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
673             }
674             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
675             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
676                     deviceInfo.getNodeInstanceIdentifier());
677             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
678             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
679         }
680
681         transactionChainManager.activateTransactionManager();
682         initialized.set(true);
683     }
684
685     @Nullable
686     @Override
687     public <T> RequestContext<T> createRequestContext() {
688         final Long xid = deviceInfo.reserveXidForDeviceMessage();
689
690         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
691             @Override
692             public void close() {
693                 requestContexts.remove(this);
694             }
695         };
696
697         requestContexts.add(abstractRequestContext);
698         return abstractRequestContext;
699     }
700
701     @Override
702     public void onStateAcquired(final ContextChainState state) {
703         hasState.set(true);
704     }
705
706     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
707         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
708         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
709
710         DeviceFlowRegistryCallback(
711                 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
712                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
713             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
714             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
715         }
716
717         @Override
718         public void onSuccess(List<Optional<FlowCapableNode>> result) {
719             if (LOG.isDebugEnabled()) {
720                 // Count all flows we read from datastore for debugging purposes.
721                 // This number do not always represent how many flows were actually added
722                 // to DeviceFlowRegistry, because of possible duplicates.
723                 long flowCount = Optional.ofNullable(result)
724                         .map(Collections::singleton)
725                         .orElse(Collections.emptySet())
726                         .stream()
727                         .flatMap(Collection::stream)
728                         .filter(Objects::nonNull)
729                         .flatMap(flowCapableNodeOptional
730                             -> com.google.common.base.Optional.fromJavaUtil(flowCapableNodeOptional).asSet().stream())
731                         .filter(Objects::nonNull)
732                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
733                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
734                         .filter(Objects::nonNull)
735                         .filter(table -> Objects.nonNull(table.getFlow()))
736                         .flatMap(table -> table.getFlow().stream())
737                         .filter(Objects::nonNull)
738                         .count();
739
740                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
741             }
742             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
743                     .INITIAL_FLOW_REGISTRY_FILL);
744         }
745
746         @Override
747         public void onFailure(Throwable throwable) {
748             if (deviceFlowRegistryFill.isCancelled()) {
749                 if (LOG.isDebugEnabled()) {
750                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
751                 }
752             } else {
753                 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
754             }
755             contextChainMastershipWatcher.onNotAbleToStartMastership(
756                     deviceInfo,
757                     "Was not able to fill flow registry on device",
758                     false);
759         }
760     }
761
762 }