Fix transaction manager closing.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
41 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
48 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
49 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
50 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
59 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
60 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
61 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
62 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
63 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
64 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
65 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
66 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
67 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
68 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
69 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
70 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
71 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
72 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
73 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
74 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
75 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
76 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
77 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
78 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
79 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
108 import org.opendaylight.yangtools.yang.binding.DataContainer;
109 import org.opendaylight.yangtools.yang.binding.DataObject;
110 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
111 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
112 import org.slf4j.Logger;
113 import org.slf4j.LoggerFactory;
114
115 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
116
117     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
118
119     // TODO: drain factor should be parametrized
120     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
121     // TODO: low water mark factor should be parametrized
122     private static final float LOW_WATERMARK_FACTOR = 0.75f;
123     // TODO: high water mark factor should be parametrized
124     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
125
126     // Timeout in milliseconds after what we will give up on initializing device
127     private static final int DEVICE_INIT_TIMEOUT = 9000;
128
129     // Timeout in milliseconds after what we will give up on closing transaction chain
130     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
131
132     private static final int LOW_WATERMARK = 1000;
133     private static final int HIGH_WATERMARK = 2000;
134
135     private final MultipartWriterProvider writerProvider;
136     private final HashedWheelTimer hashedWheelTimer;
137     private final DeviceState deviceState;
138     private final DataBroker dataBroker;
139     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
140     private final MessageSpy messageSpy;
141     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
142     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
143     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
144     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
145             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
146     private final TranslatorLibrary translatorLibrary;
147     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
148     private final ConvertorExecutor convertorExecutor;
149     private final DeviceInitializerProvider deviceInitializerProvider;
150     private final PacketInRateLimiter packetInLimiter;
151     private final DeviceInfo deviceInfo;
152     private final ConnectionContext primaryConnectionContext;
153     private final boolean skipTableFeatures;
154     private final boolean switchFeaturesMandatory;
155     private final boolean isFlowRemovedNotificationOn;
156     private final boolean useSingleLayerSerialization;
157     private final AtomicBoolean initialized = new AtomicBoolean(false);
158     private final AtomicBoolean hasState = new AtomicBoolean(false);
159     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
160     private NotificationPublishService notificationPublishService;
161     private TransactionChainManager transactionChainManager;
162     private DeviceFlowRegistry deviceFlowRegistry;
163     private DeviceGroupRegistry deviceGroupRegistry;
164     private DeviceMeterRegistry deviceMeterRegistry;
165     private ExtensionConverterProvider extensionConverterProvider;
166     private ContextChainMastershipWatcher contextChainMastershipWatcher;
167
168     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
169                       @Nonnull final DataBroker dataBroker,
170                       @Nonnull final MessageSpy messageSpy,
171                       @Nonnull final TranslatorLibrary translatorLibrary,
172                       final ConvertorExecutor convertorExecutor,
173                       final boolean skipTableFeatures,
174                       final HashedWheelTimer hashedWheelTimer,
175                       final boolean useSingleLayerSerialization,
176                       final DeviceInitializerProvider deviceInitializerProvider,
177                       final boolean isFlowRemovedNotificationOn,
178                       final boolean switchFeaturesMandatory) {
179
180         this.primaryConnectionContext = primaryConnectionContext;
181         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
182         this.hashedWheelTimer = hashedWheelTimer;
183         this.deviceInitializerProvider = deviceInitializerProvider;
184         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
185         this.switchFeaturesMandatory = switchFeaturesMandatory;
186         this.deviceState = new DeviceStateImpl();
187         this.dataBroker = dataBroker;
188         this.messageSpy = messageSpy;
189
190         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
191                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
192
193         this.translatorLibrary = translatorLibrary;
194         this.portStatusTranslator = translatorLibrary.lookupTranslator(
195                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
196         this.packetInTranslator = translatorLibrary.lookupTranslator(
197                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
198                         .protocol.rev130731
199                         .PacketIn.class.getName()));
200         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
201                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
202
203         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
204         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
205         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
206         this.convertorExecutor = convertorExecutor;
207         this.skipTableFeatures = skipTableFeatures;
208         this.useSingleLayerSerialization = useSingleLayerSerialization;
209         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
210     }
211
212     @Override
213     public boolean initialSubmitTransaction() {
214         if (!initialized.get()) {
215             return false;
216         }
217
218         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
219         isInitialTransactionSubmitted.set(initialSubmit);
220         return initialSubmit;
221     }
222
223     @Override
224     public DeviceState getDeviceState() {
225         return deviceState;
226     }
227
228     @Override
229     public ReadOnlyTransaction getReadTransaction() {
230         return dataBroker.newReadOnlyTransaction();
231     }
232
233     @Override
234     public boolean isTransactionsEnabled() {
235         return isInitialTransactionSubmitted.get();
236     }
237
238     @Override
239     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
240                                                           final InstanceIdentifier<T> path,
241                                                           final T data) {
242         if (initialized.get()) {
243             transactionChainManager.writeToTransaction(store, path, data, false);
244         }
245     }
246
247     @Override
248     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
249                                                                          final InstanceIdentifier<T> path,
250                                                                          final T data) {
251         if (initialized.get()) {
252             transactionChainManager.writeToTransaction(store, path, data, true);
253         }
254     }
255
256     @Override
257     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
258                                                           final InstanceIdentifier<T> path) {
259         if (initialized.get()) {
260             transactionChainManager.addDeleteOperationTotTxChain(store, path);
261         }
262     }
263
264     @Override
265     public boolean submitTransaction() {
266         return initialized.get() && transactionChainManager.submitTransaction();
267     }
268
269     @Override
270     public ConnectionContext getPrimaryConnectionContext() {
271         return primaryConnectionContext;
272     }
273
274     @Override
275     public DeviceFlowRegistry getDeviceFlowRegistry() {
276         return deviceFlowRegistry;
277     }
278
279     @Override
280     public DeviceGroupRegistry getDeviceGroupRegistry() {
281         return deviceGroupRegistry;
282     }
283
284     @Override
285     public DeviceMeterRegistry getDeviceMeterRegistry() {
286         return deviceMeterRegistry;
287     }
288
289     @Override
290     public void processReply(final OfHeader ofHeader) {
291         messageSpy.spyMessage(
292                 ofHeader.getImplementedInterface(),
293                 (ofHeader instanceof Error)
294                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
295                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
296     }
297
298     @Override
299     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
300         ofHeaderList.forEach(header -> messageSpy.spyMessage(
301                 header.getImplementedInterface(),
302                 (header instanceof Error)
303                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
304                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
305     }
306
307     @Override
308     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
309         //1. translate to general flow (table, priority, match, cookie)
310         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
311                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
312
313         if (isFlowRemovedNotificationOn) {
314             // Trigger off a notification
315             notificationPublishService.offerNotification(flowRemovedNotification);
316         }
317
318         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
319         if (itemLifecycleListener != null) {
320             //2. create registry key
321             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(),
322                     flowRemovedNotification);
323             //3. lookup flowId
324             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
325             //4. if flowId present:
326             if (flowDescriptor != null) {
327                 // a) construct flow path
328                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
329                         .augmentation(FlowCapableNode.class)
330                         .child(Table.class, flowDescriptor.getTableKey())
331                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
332                 // b) notify listener
333                 itemLifecycleListener.onRemoved(flowPath);
334             } else {
335                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
336                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
337             }
338         }
339     }
340
341     @Override
342     @SuppressWarnings("checkstyle:IllegalCatch")
343     public void processPortStatusMessage(final PortStatusMessage portStatus) {
344         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
345                 .FROM_SWITCH_PUBLISHED_SUCCESS);
346
347         if (initialized.get()) {
348             try {
349                 writePortStatusMessage(portStatus);
350                 submitTransaction();
351             } catch (final Exception e) {
352                 LOG.warn("Error processing port status message for port {} on device {}",
353                         portStatus.getPortNo(), getDeviceInfo(), e);
354             }
355         } else if (!hasState.get()) {
356             primaryConnectionContext.handlePortStatusMessage(portStatus);
357         }
358     }
359
360     private void writePortStatusMessage(final PortStatus portStatusMessage) {
361         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
362                 .translate(portStatusMessage, getDeviceInfo(), null);
363
364         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
365                 .getNodeInstanceIdentifier()
366                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
367                         .nodeConnectorIdfromDatapathPortNo(
368                                 deviceInfo.getDatapathId(),
369                                 portStatusMessage.getPortNo(),
370                                 OpenflowVersion.get(deviceInfo.getVersion()))));
371
372         if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
373                 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
374             // because of ADD status node connector has to be created
375             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
376                     .setKey(iiToNodeConnector.getKey())
377                     .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
378                             FlowCapableNodeConnectorStatisticsDataBuilder().build())
379                     .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
380                     .build());
381         } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
382             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
383         }
384     }
385
386     @Override
387     public void processPacketInMessage(final PacketInMessage packetInMessage) {
388         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
389         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
390     }
391
392     private void handlePacketInMessage(final PacketIn packetIn,
393                                        final Class<?> implementedInterface,
394                                        final Match match) {
395         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
396         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
397
398         if (packetIn == null) {
399             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
400             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
401             return;
402         }
403
404         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
405
406         // Try to get ingress from match
407         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
408                 ? packetIn.getIngress() : Optional.ofNullable(match)
409                 .map(Match::getInPort)
410                 .map(nodeConnectorId -> InventoryDataServiceUtil
411                         .portNumberfromNodeConnectorId(
412                                 openflowVersion,
413                                 nodeConnectorId))
414                 .map(portNumber -> InventoryDataServiceUtil
415                         .nodeConnectorRefFromDatapathIdPortno(
416                                 deviceInfo.getDatapathId(),
417                                 portNumber,
418                                 openflowVersion))
419                 .orElse(null);
420
421         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
422
423         if (!packetInLimiter.acquirePermit()) {
424             LOG.debug("Packet limited");
425             // TODO: save packet into emergency slot if possible
426             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
427                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
428             return;
429         }
430
431         final ListenableFuture<?> offerNotification = notificationPublishService
432                 .offerNotification(new PacketReceivedBuilder(packetIn)
433                         .setIngress(nodeConnectorRef)
434                         .setMatch(MatchUtil.transformMatch(match,
435                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
436                                         .Match.class))
437                         .build());
438
439         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
440             LOG.debug("notification offer rejected");
441             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
442             packetInLimiter.drainLowWaterMark();
443             packetInLimiter.releasePermit();
444             return;
445         }
446
447         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
448             @Override
449             public void onSuccess(final Object result) {
450                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
451                 packetInLimiter.releasePermit();
452             }
453
454             @Override
455             public void onFailure(final Throwable throwable) {
456                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
457                         .FROM_SWITCH_NOTIFICATION_REJECTED);
458                 LOG.debug("notification offer failed: {}", throwable.getMessage());
459                 LOG.trace("notification offer failed..", throwable);
460                 packetInLimiter.releasePermit();
461             }
462         });
463     }
464
465     @Override
466     public void processExperimenterMessage(final ExperimenterMessage notification) {
467         // lookup converter
468         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
469         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
470                 getDeviceInfo().getVersion(),
471                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
472         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
473                 extensionConverterProvider.getMessageConverter(key);
474         if (messageConverter == null) {
475             LOG.warn("custom converter for {}[OF:{}] not found",
476                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
477                     getDeviceInfo().getVersion());
478             return;
479         }
480         // build notification
481         final ExperimenterMessageOfChoice messageOfChoice;
482         try {
483             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
484             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
485                     ExperimenterMessageFromDevBuilder()
486                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
487                     .setExperimenterMessageOfChoice(messageOfChoice);
488             // publish
489             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
490         } catch (final ConversionException e) {
491             LOG.error("Conversion of experimenter notification failed", e);
492         }
493     }
494
495     @Override
496     public boolean processAlienMessage(final OfHeader message) {
497         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
498
499         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
500                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
501             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
502                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503                     .rev130709.PacketInMessage.class.cast(message);
504
505             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
506             return true;
507         }
508
509         return false;
510     }
511
512     @Override
513     public TranslatorLibrary oook() {
514         return translatorLibrary;
515     }
516
517     @Override
518     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
519         this.notificationPublishService = notificationPublishService;
520     }
521
522     @Override
523     public MessageSpy getMessageSpy() {
524         return messageSpy;
525     }
526
527     @Override
528     public void onPublished() {
529         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
530     }
531
532     @Override
533     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
534                                                                                       requestContext) {
535         return new MultiMsgCollectorImpl<>(this, requestContext);
536     }
537
538     @Override
539     public void updatePacketInRateLimit(final long upperBound) {
540         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
541                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
542     }
543
544     @Override
545     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
546         return itemLifeCycleSourceRegistry;
547     }
548
549     @Override
550     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
551         this.extensionConverterProvider = extensionConverterProvider;
552     }
553
554     @Override
555     public ExtensionConverterProvider getExtensionConverterProvider() {
556         return extensionConverterProvider;
557     }
558
559     @VisibleForTesting
560     TransactionChainManager getTransactionChainManager() {
561         return this.transactionChainManager;
562     }
563
564     @Override
565     public ListenableFuture<Void> closeServiceInstance() {
566         final ListenableFuture<Void> listenableFuture = initialized.get()
567                 ? transactionChainManager.deactivateTransactionManager()
568                 : Futures.immediateFuture(null);
569
570         hashedWheelTimer.newTimeout((timerTask) -> {
571             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
572                 listenableFuture.cancel(true);
573             }
574         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
575
576         return listenableFuture;
577     }
578
579     @Override
580     public DeviceInfo getDeviceInfo() {
581         return this.deviceInfo;
582     }
583
584     @Override
585     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
586         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
587     }
588
589     @Nonnull
590     @Override
591     public ServiceGroupIdentifier getIdentifier() {
592         return deviceInfo.getServiceIdentifier();
593     }
594
595     @Override
596     public void close() {
597         // Close all datastore registries and transactions
598         if (initialized.getAndSet(false)) {
599             deviceGroupRegistry.close();
600             deviceFlowRegistry.close();
601             deviceMeterRegistry.close();
602
603             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
604
605             Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
606                 @Override
607                 public void onSuccess(@Nullable final Void result) {
608                     transactionChainManager.close();
609                     transactionChainManager = null;
610                 }
611
612                 @Override
613                 public void onFailure(final Throwable throwable) {
614                     transactionChainManager.close();
615                     transactionChainManager = null;
616                 }
617             });
618         }
619
620         requestContexts.forEach(requestContext -> RequestContextUtil
621                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
622         requestContexts.clear();
623     }
624
625     @Override
626     public boolean canUseSingleLayerSerialization() {
627         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
628     }
629
630     @Override
631     @SuppressWarnings("checkstyle:IllegalCatch")
632     public void instantiateServiceInstance() {
633         lazyTransactionManagerInitialization();
634
635         try {
636             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
637                     .retrieveAndClearPortStatusMessages();
638
639             portStatusMessages.forEach(this::writePortStatusMessage);
640             submitTransaction();
641         } catch (final Exception ex) {
642             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
643                     deviceInfo.toString(),
644                     ex.toString()));
645         }
646
647         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
648                 .lookup(deviceInfo.getVersion());
649
650         if (initializer.isPresent()) {
651             final Future<Void> initialize = initializer
652                     .get()
653                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
654
655             try {
656                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
657             } catch (TimeoutException ex) {
658                 initialize.cancel(true);
659                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
660                         deviceInfo.toString(),
661                         String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
662                         ex.toString()));
663             } catch (ExecutionException | InterruptedException ex) {
664                 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
665                         deviceInfo.toString(),
666                         ex.toString()));
667             }
668         } else {
669             throw new RuntimeException(String.format("Unsupported version %s for device %s",
670                     deviceInfo.getVersion(),
671                     deviceInfo.toString()));
672         }
673
674         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
675                 getDeviceFlowRegistry().fill();
676         Futures.addCallback(deviceFlowRegistryFill,
677                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
678     }
679
680     @VisibleForTesting
681     void lazyTransactionManagerInitialization() {
682         if (!this.initialized.get()) {
683             if (LOG.isDebugEnabled()) {
684                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
685             }
686             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
687             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
688                     deviceInfo.getNodeInstanceIdentifier());
689             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
690             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
691         }
692
693         transactionChainManager.activateTransactionManager();
694         initialized.set(true);
695     }
696
697     @Nullable
698     @Override
699     public <T> RequestContext<T> createRequestContext() {
700         final Long xid = deviceInfo.reserveXidForDeviceMessage();
701
702         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
703             @Override
704             public void close() {
705                 requestContexts.remove(this);
706             }
707         };
708
709         requestContexts.add(abstractRequestContext);
710         return abstractRequestContext;
711     }
712
713     @Override
714     public void onStateAcquired(final ContextChainState state) {
715         hasState.set(true);
716     }
717
718     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
719             .Optional<FlowCapableNode>>> {
720         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
721         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
722
723         DeviceFlowRegistryCallback(
724                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
725                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
726             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
727             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
728         }
729
730         @Override
731         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
732             if (LOG.isDebugEnabled()) {
733                 // Count all flows we read from datastore for debugging purposes.
734                 // This number do not always represent how many flows were actually added
735                 // to DeviceFlowRegistry, because of possible duplicates.
736                 long flowCount = Optional.ofNullable(result)
737                         .map(Collections::singleton)
738                         .orElse(Collections.emptySet())
739                         .stream()
740                         .flatMap(Collection::stream)
741                         .filter(Objects::nonNull)
742                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
743                         .filter(Objects::nonNull)
744                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
745                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
746                         .filter(Objects::nonNull)
747                         .filter(table -> Objects.nonNull(table.getFlow()))
748                         .flatMap(table -> table.getFlow().stream())
749                         .filter(Objects::nonNull)
750                         .count();
751
752                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo
753                         );
754             }
755             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
756                     .INITIAL_FLOW_REGISTRY_FILL);
757         }
758
759         @Override
760         public void onFailure(Throwable throwable) {
761             if (deviceFlowRegistryFill.isCancelled()) {
762                 if (LOG.isDebugEnabled()) {
763                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
764                 }
765             } else {
766                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo
767                         , throwable);
768             }
769             contextChainMastershipWatcher.onNotAbleToStartMastership(
770                     deviceInfo,
771                     "Was not able to fill flow registry on device",
772                     false);
773         }
774     }
775
776 }