Fix codestyle
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import io.netty.util.HashedWheelTimer;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
30 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
34 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
40 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
41 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
43 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
44 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
45 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
48 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
49 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
50 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
57 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
58 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
63 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
64 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
65 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
66 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
67 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
68 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
69 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
70 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
72 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
73 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
74 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
75 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
76 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
77 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
78 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
107 import org.opendaylight.yangtools.yang.binding.DataContainer;
108 import org.opendaylight.yangtools.yang.binding.DataObject;
109 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
110 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
111 import org.slf4j.Logger;
112 import org.slf4j.LoggerFactory;
113
114 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
115
116     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
117
118     // TODO: drain factor should be parametrized
119     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
120     // TODO: low water mark factor should be parametrized
121     private static final float LOW_WATERMARK_FACTOR = 0.75f;
122     // TODO: high water mark factor should be parametrized
123     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
124
125     // Timeout in milliseconds after what we will give up on initializing device
126     private static final int DEVICE_INIT_TIMEOUT = 9000;
127
128     // Timeout in milliseconds after what we will give up on closing transaction chain
129     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
130
131     private static final int LOW_WATERMARK = 1000;
132     private static final int HIGH_WATERMARK = 2000;
133
134     private final MultipartWriterProvider writerProvider;
135     private final HashedWheelTimer hashedWheelTimer;
136     private final DeviceState deviceState;
137     private final DataBroker dataBroker;
138     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
139     private final MessageSpy messageSpy;
140     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
141     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
142     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
143     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
144             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
145     private final TranslatorLibrary translatorLibrary;
146     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
147     private final ConvertorExecutor convertorExecutor;
148     private final DeviceInitializerProvider deviceInitializerProvider;
149     private final PacketInRateLimiter packetInLimiter;
150     private final DeviceInfo deviceInfo;
151     private final ConnectionContext primaryConnectionContext;
152     private final boolean skipTableFeatures;
153     private final boolean switchFeaturesMandatory;
154     private final boolean isFlowRemovedNotificationOn;
155     private final boolean useSingleLayerSerialization;
156     private final AtomicBoolean initialized = new AtomicBoolean(false);
157     private final AtomicBoolean hasState = new AtomicBoolean(false);
158     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
159     private NotificationPublishService notificationPublishService;
160     private TransactionChainManager transactionChainManager;
161     private DeviceFlowRegistry deviceFlowRegistry;
162     private DeviceGroupRegistry deviceGroupRegistry;
163     private DeviceMeterRegistry deviceMeterRegistry;
164     private ExtensionConverterProvider extensionConverterProvider;
165     private ContextChainMastershipWatcher contextChainMastershipWatcher;
166
167     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
168                       @Nonnull final DataBroker dataBroker,
169                       @Nonnull final MessageSpy messageSpy,
170                       @Nonnull final TranslatorLibrary translatorLibrary,
171                       final ConvertorExecutor convertorExecutor,
172                       final boolean skipTableFeatures,
173                       final HashedWheelTimer hashedWheelTimer,
174                       final boolean useSingleLayerSerialization,
175                       final DeviceInitializerProvider deviceInitializerProvider,
176                       final boolean isFlowRemovedNotificationOn,
177                       final boolean switchFeaturesMandatory) {
178
179         this.primaryConnectionContext = primaryConnectionContext;
180         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
181         this.hashedWheelTimer = hashedWheelTimer;
182         this.deviceInitializerProvider = deviceInitializerProvider;
183         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
184         this.switchFeaturesMandatory = switchFeaturesMandatory;
185         this.deviceState = new DeviceStateImpl();
186         this.dataBroker = dataBroker;
187         this.messageSpy = messageSpy;
188
189         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
190                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
191
192         this.translatorLibrary = translatorLibrary;
193         this.portStatusTranslator = translatorLibrary.lookupTranslator(
194                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
195         this.packetInTranslator = translatorLibrary.lookupTranslator(
196                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
197                         .protocol.rev130731
198                         .PacketIn.class.getName()));
199         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
200                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
201
202         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
203         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
204         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
205         this.convertorExecutor = convertorExecutor;
206         this.skipTableFeatures = skipTableFeatures;
207         this.useSingleLayerSerialization = useSingleLayerSerialization;
208         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
209     }
210
211     @Override
212     public boolean initialSubmitTransaction() {
213         if (!initialized.get()) {
214             return false;
215         }
216
217         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
218         isInitialTransactionSubmitted.set(initialSubmit);
219         return initialSubmit;
220     }
221
222     @Override
223     public DeviceState getDeviceState() {
224         return deviceState;
225     }
226
227     @Override
228     public ReadOnlyTransaction getReadTransaction() {
229         return dataBroker.newReadOnlyTransaction();
230     }
231
232     @Override
233     public boolean isTransactionsEnabled() {
234         return isInitialTransactionSubmitted.get();
235     }
236
237     @Override
238     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
239                                                           final InstanceIdentifier<T> path,
240                                                           final T data) {
241         if (initialized.get()) {
242             transactionChainManager.writeToTransaction(store, path, data, false);
243         }
244     }
245
246     @Override
247     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
248                                                                          final InstanceIdentifier<T> path,
249                                                                          final T data) {
250         if (initialized.get()) {
251             transactionChainManager.writeToTransaction(store, path, data, true);
252         }
253     }
254
255     @Override
256     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
257                                                           final InstanceIdentifier<T> path) {
258         if (initialized.get()) {
259             transactionChainManager.addDeleteOperationTotTxChain(store, path);
260         }
261     }
262
263     @Override
264     public boolean submitTransaction() {
265         return initialized.get() && transactionChainManager.submitWriteTransaction();
266     }
267
268     @Override
269     public ConnectionContext getPrimaryConnectionContext() {
270         return primaryConnectionContext;
271     }
272
273     @Override
274     public DeviceFlowRegistry getDeviceFlowRegistry() {
275         return deviceFlowRegistry;
276     }
277
278     @Override
279     public DeviceGroupRegistry getDeviceGroupRegistry() {
280         return deviceGroupRegistry;
281     }
282
283     @Override
284     public DeviceMeterRegistry getDeviceMeterRegistry() {
285         return deviceMeterRegistry;
286     }
287
288     @Override
289     public void processReply(final OfHeader ofHeader) {
290         messageSpy.spyMessage(
291                 ofHeader.getImplementedInterface(),
292                 (ofHeader instanceof Error)
293                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
294                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
295     }
296
297     @Override
298     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
299         ofHeaderList.forEach(header -> messageSpy.spyMessage(
300                 header.getImplementedInterface(),
301                 (header instanceof Error)
302                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
304     }
305
306     @Override
307     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
308         //1. translate to general flow (table, priority, match, cookie)
309         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
310                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
311
312         if (isFlowRemovedNotificationOn) {
313             // Trigger off a notification
314             notificationPublishService.offerNotification(flowRemovedNotification);
315         }
316
317         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
318         if (itemLifecycleListener != null) {
319             //2. create registry key
320             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(),
321                     flowRemovedNotification);
322             //3. lookup flowId
323             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
324             //4. if flowId present:
325             if (flowDescriptor != null) {
326                 // a) construct flow path
327                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
328                         .augmentation(FlowCapableNode.class)
329                         .child(Table.class, flowDescriptor.getTableKey())
330                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
331                 // b) notify listener
332                 itemLifecycleListener.onRemoved(flowPath);
333             } else {
334                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
335                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
336             }
337         }
338     }
339
340     @Override
341     @SuppressWarnings("checkstyle:IllegalCatch")
342     public void processPortStatusMessage(final PortStatusMessage portStatus) {
343         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
344                 .FROM_SWITCH_PUBLISHED_SUCCESS);
345
346         if (initialized.get()) {
347             try {
348                 writePortStatusMessage(portStatus);
349                 submitTransaction();
350             } catch (final Exception e) {
351                 LOG.warn("Error processing port status message for port {} on device {}",
352                         portStatus.getPortNo(), getDeviceInfo(), e);
353             }
354         } else if (!hasState.get()) {
355             primaryConnectionContext.handlePortStatusMessage(portStatus);
356         }
357     }
358
359     private void writePortStatusMessage(final PortStatus portStatusMessage) {
360         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
361                 .translate(portStatusMessage, getDeviceInfo(), null);
362
363         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
364                 .getNodeInstanceIdentifier()
365                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
366                         .nodeConnectorIdfromDatapathPortNo(
367                                 deviceInfo.getDatapathId(),
368                                 portStatusMessage.getPortNo(),
369                                 OpenflowVersion.get(deviceInfo.getVersion()))));
370
371         if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
372                 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
373             // because of ADD status node connector has to be created
374             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
375                     .setKey(iiToNodeConnector.getKey())
376                     .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
377                             FlowCapableNodeConnectorStatisticsDataBuilder().build())
378                     .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
379                     .build());
380         } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
381             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
382         }
383     }
384
385     @Override
386     public void processPacketInMessage(final PacketInMessage packetInMessage) {
387         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
388         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
389     }
390
391     private void handlePacketInMessage(final PacketIn packetIn,
392                                        final Class<?> implementedInterface,
393                                        final Match match) {
394         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
395         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
396
397         if (packetIn == null) {
398             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
399             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
400             return;
401         }
402
403         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
404
405         // Try to get ingress from match
406         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
407                 ? packetIn.getIngress() : Optional.ofNullable(match)
408                 .map(Match::getInPort)
409                 .map(nodeConnectorId -> InventoryDataServiceUtil
410                         .portNumberfromNodeConnectorId(
411                                 openflowVersion,
412                                 nodeConnectorId))
413                 .map(portNumber -> InventoryDataServiceUtil
414                         .nodeConnectorRefFromDatapathIdPortno(
415                                 deviceInfo.getDatapathId(),
416                                 portNumber,
417                                 openflowVersion))
418                 .orElse(null);
419
420         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
421
422         if (!packetInLimiter.acquirePermit()) {
423             LOG.debug("Packet limited");
424             // TODO: save packet into emergency slot if possible
425             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
426                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
427             return;
428         }
429
430         final ListenableFuture<?> offerNotification = notificationPublishService
431                 .offerNotification(new PacketReceivedBuilder(packetIn)
432                         .setIngress(nodeConnectorRef)
433                         .setMatch(MatchUtil.transformMatch(match,
434                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
435                                         .Match.class))
436                         .build());
437
438         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
439             LOG.debug("notification offer rejected");
440             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
441             packetInLimiter.drainLowWaterMark();
442             packetInLimiter.releasePermit();
443             return;
444         }
445
446         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
447             @Override
448             public void onSuccess(final Object result) {
449                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
450                 packetInLimiter.releasePermit();
451             }
452
453             @Override
454             public void onFailure(final Throwable throwable) {
455                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
456                         .FROM_SWITCH_NOTIFICATION_REJECTED);
457                 LOG.debug("notification offer failed: {}", throwable.getMessage());
458                 LOG.trace("notification offer failed..", throwable);
459                 packetInLimiter.releasePermit();
460             }
461         });
462     }
463
464     @Override
465     public void processExperimenterMessage(final ExperimenterMessage notification) {
466         // lookup converter
467         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
468         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
469                 getDeviceInfo().getVersion(),
470                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
471         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
472                 extensionConverterProvider.getMessageConverter(key);
473         if (messageConverter == null) {
474             LOG.warn("custom converter for {}[OF:{}] not found",
475                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
476                     getDeviceInfo().getVersion());
477             return;
478         }
479         // build notification
480         final ExperimenterMessageOfChoice messageOfChoice;
481         try {
482             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
483             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
484                     ExperimenterMessageFromDevBuilder()
485                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
486                     .setExperimenterMessageOfChoice(messageOfChoice);
487             // publish
488             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
489         } catch (final ConversionException e) {
490             LOG.error("Conversion of experimenter notification failed", e);
491         }
492     }
493
494     @Override
495     public boolean processAlienMessage(final OfHeader message) {
496         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
497
498         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
499                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
500             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
501                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
502                     .rev130709.PacketInMessage.class.cast(message);
503
504             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
505             return true;
506         }
507
508         return false;
509     }
510
511     @Override
512     public TranslatorLibrary oook() {
513         return translatorLibrary;
514     }
515
516     @Override
517     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
518         this.notificationPublishService = notificationPublishService;
519     }
520
521     @Override
522     public MessageSpy getMessageSpy() {
523         return messageSpy;
524     }
525
526     @Override
527     public void onPublished() {
528         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
529     }
530
531     @Override
532     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
533                                                                                       requestContext) {
534         return new MultiMsgCollectorImpl<>(this, requestContext);
535     }
536
537     @Override
538     public void updatePacketInRateLimit(final long upperBound) {
539         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
540                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
541     }
542
543     @Override
544     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
545         return itemLifeCycleSourceRegistry;
546     }
547
548     @Override
549     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
550         this.extensionConverterProvider = extensionConverterProvider;
551     }
552
553     @Override
554     public ExtensionConverterProvider getExtensionConverterProvider() {
555         return extensionConverterProvider;
556     }
557
558     @VisibleForTesting
559     TransactionChainManager getTransactionChainManager() {
560         return this.transactionChainManager;
561     }
562
563     @Override
564     public ListenableFuture<Void> closeServiceInstance() {
565         final ListenableFuture<Void> listenableFuture = initialized.get()
566                 ? transactionChainManager.deactivateTransactionManager()
567                 : Futures.immediateFuture(null);
568
569         hashedWheelTimer.newTimeout((timerTask) -> {
570             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
571                 listenableFuture.cancel(true);
572             }
573         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
574
575         return listenableFuture;
576     }
577
578     @Override
579     public DeviceInfo getDeviceInfo() {
580         return this.deviceInfo;
581     }
582
583     @Override
584     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
585         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
586     }
587
588     @Nonnull
589     @Override
590     public ServiceGroupIdentifier getIdentifier() {
591         return deviceInfo.getServiceIdentifier();
592     }
593
594     @Override
595     public void close() {
596         // Close all datastore registries and transactions
597         if (initialized.getAndSet(false)) {
598             deviceGroupRegistry.close();
599             deviceFlowRegistry.close();
600             deviceMeterRegistry.close();
601
602             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
603
604             Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
605                 @Override
606                 public void onSuccess(@Nullable final Void result) {
607                     transactionChainManager.close();
608                     transactionChainManager = null;
609                 }
610
611                 @Override
612                 public void onFailure(final Throwable throwable) {
613                     transactionChainManager.close();
614                     transactionChainManager = null;
615                 }
616             });
617         }
618
619         requestContexts.forEach(requestContext -> RequestContextUtil
620                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
621         requestContexts.clear();
622     }
623
624     @Override
625     public boolean canUseSingleLayerSerialization() {
626         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
627     }
628
629     @Override
630     @SuppressWarnings("checkstyle:IllegalCatch")
631     public void instantiateServiceInstance() {
632         lazyTransactionManagerInitialization();
633
634         try {
635             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
636                     .retrieveAndClearPortStatusMessages();
637
638             portStatusMessages.forEach(this::writePortStatusMessage);
639             submitTransaction();
640         } catch (final Exception ex) {
641             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
642                     deviceInfo.toString(),
643                     ex.toString()));
644         }
645
646         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
647                 .lookup(deviceInfo.getVersion());
648
649         if (initializer.isPresent()) {
650             final Future<Void> initialize = initializer
651                     .get()
652                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
653
654             try {
655                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
656             } catch (TimeoutException ex) {
657                 initialize.cancel(true);
658                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
659                         deviceInfo.toString(),
660                         String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
661                         ex.toString()));
662             } catch (ExecutionException | InterruptedException ex) {
663                 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
664                         deviceInfo.toString(),
665                         ex.toString()));
666             }
667         } else {
668             throw new RuntimeException(String.format("Unsupported version %s for device %s",
669                     deviceInfo.getVersion(),
670                     deviceInfo.toString()));
671         }
672
673         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
674                 getDeviceFlowRegistry().fill();
675         Futures.addCallback(deviceFlowRegistryFill,
676                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
677     }
678
679     @VisibleForTesting
680     void lazyTransactionManagerInitialization() {
681         if (!this.initialized.get()) {
682             if (LOG.isDebugEnabled()) {
683                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
684             }
685             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
686             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo
687                     .getNodeInstanceIdentifier());
688             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
689             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
690         }
691
692         transactionChainManager.activateTransactionManager();
693         initialized.set(true);
694     }
695
696     @Nullable
697     @Override
698     public <T> RequestContext<T> createRequestContext() {
699         final Long xid = deviceInfo.reserveXidForDeviceMessage();
700
701         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
702             @Override
703             public void close() {
704                 requestContexts.remove(this);
705             }
706         };
707
708         requestContexts.add(abstractRequestContext);
709         return abstractRequestContext;
710     }
711
712     @Override
713     public void onStateAcquired(final ContextChainState state) {
714         hasState.set(true);
715     }
716
717     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
718             .Optional<FlowCapableNode>>> {
719         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
720         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
721
722         DeviceFlowRegistryCallback(
723                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
724                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
725             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
726             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
727         }
728
729         @Override
730         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
731             if (LOG.isDebugEnabled()) {
732                 // Count all flows we read from datastore for debugging purposes.
733                 // This number do not always represent how many flows were actually added
734                 // to DeviceFlowRegistry, because of possible duplicates.
735                 long flowCount = Optional.ofNullable(result)
736                         .map(Collections::singleton)
737                         .orElse(Collections.emptySet())
738                         .stream()
739                         .flatMap(Collection::stream)
740                         .filter(Objects::nonNull)
741                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
742                         .filter(Objects::nonNull)
743                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
744                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
745                         .filter(Objects::nonNull)
746                         .filter(table -> Objects.nonNull(table.getFlow()))
747                         .flatMap(table -> table.getFlow().stream())
748                         .filter(Objects::nonNull)
749                         .count();
750
751                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo
752                         );
753             }
754             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
755                     .INITIAL_FLOW_REGISTRY_FILL);
756         }
757
758         @Override
759         public void onFailure(Throwable throwable) {
760             if (deviceFlowRegistryFill.isCancelled()) {
761                 if (LOG.isDebugEnabled()) {
762                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
763                 }
764             } else {
765                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo
766                         , throwable);
767             }
768             contextChainMastershipWatcher.onNotAbleToStartMastership(
769                     deviceInfo,
770                     "Was not able to fill flow registry on device",
771                     false);
772         }
773     }
774
775 }