Merge remote-tracking branch 'liblldp/master'
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import io.netty.util.HashedWheelTimer;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.List;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
31 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
49 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
50 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
51 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
53 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
55 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
56 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
57 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
58 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
59 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
60 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
61 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
62 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
63 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
64 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
65 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
66 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
68 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
70 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
71 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
72 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
73 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
99 import org.opendaylight.yangtools.yang.binding.DataContainer;
100 import org.opendaylight.yangtools.yang.binding.DataObject;
101 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
102 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
105
106 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
107
108     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
109
110     // TODO: drain factor should be parametrized
111     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
112     // TODO: low water mark factor should be parametrized
113     private static final float LOW_WATERMARK_FACTOR = 0.75f;
114     // TODO: high water mark factor should be parametrized
115     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
116
117     // Timeout in milliseconds after what we will give up on initializing device
118     private static final int DEVICE_INIT_TIMEOUT = 9000;
119
120     // Timeout in milliseconds after what we will give up on closing transaction chain
121     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
122
123     private static final int LOW_WATERMARK = 1000;
124     private static final int HIGH_WATERMARK = 2000;
125
126     private final MultipartWriterProvider writerProvider;
127     private final HashedWheelTimer hashedWheelTimer;
128     private final DeviceState deviceState;
129     private final DataBroker dataBroker;
130     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
131     private final MessageSpy messageSpy;
132     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
133     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
134     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
135             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
136     private final TranslatorLibrary translatorLibrary;
137     private final ConvertorExecutor convertorExecutor;
138     private final DeviceInitializerProvider deviceInitializerProvider;
139     private final PacketInRateLimiter packetInLimiter;
140     private final DeviceInfo deviceInfo;
141     private final ConnectionContext primaryConnectionContext;
142     private final boolean skipTableFeatures;
143     private final boolean switchFeaturesMandatory;
144     private final boolean isFlowRemovedNotificationOn;
145     private final boolean useSingleLayerSerialization;
146     private final AtomicBoolean initialized = new AtomicBoolean(false);
147     private final AtomicBoolean hasState = new AtomicBoolean(false);
148     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
149     private NotificationPublishService notificationPublishService;
150     private TransactionChainManager transactionChainManager;
151     private DeviceFlowRegistry deviceFlowRegistry;
152     private DeviceGroupRegistry deviceGroupRegistry;
153     private DeviceMeterRegistry deviceMeterRegistry;
154     private ExtensionConverterProvider extensionConverterProvider;
155     private ContextChainMastershipWatcher contextChainMastershipWatcher;
156
157     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
158                       @Nonnull final DataBroker dataBroker,
159                       @Nonnull final MessageSpy messageSpy,
160                       @Nonnull final TranslatorLibrary translatorLibrary,
161                       final ConvertorExecutor convertorExecutor,
162                       final boolean skipTableFeatures,
163                       final HashedWheelTimer hashedWheelTimer,
164                       final boolean useSingleLayerSerialization,
165                       final DeviceInitializerProvider deviceInitializerProvider,
166                       final boolean isFlowRemovedNotificationOn,
167                       final boolean switchFeaturesMandatory) {
168
169         this.primaryConnectionContext = primaryConnectionContext;
170         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
171         this.hashedWheelTimer = hashedWheelTimer;
172         this.deviceInitializerProvider = deviceInitializerProvider;
173         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
174         this.switchFeaturesMandatory = switchFeaturesMandatory;
175         this.deviceState = new DeviceStateImpl();
176         this.dataBroker = dataBroker;
177         this.messageSpy = messageSpy;
178
179         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
180                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
181
182         this.translatorLibrary = translatorLibrary;
183         this.portStatusTranslator = translatorLibrary.lookupTranslator(
184                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
185         this.packetInTranslator = translatorLibrary.lookupTranslator(
186                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
187                         .protocol.rev130731
188                         .PacketIn.class.getName()));
189         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
190                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
191
192         this.convertorExecutor = convertorExecutor;
193         this.skipTableFeatures = skipTableFeatures;
194         this.useSingleLayerSerialization = useSingleLayerSerialization;
195         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
196     }
197
198     @Override
199     public boolean initialSubmitTransaction() {
200         if (!initialized.get()) {
201             return false;
202         }
203
204         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
205         isInitialTransactionSubmitted.set(initialSubmit);
206         return initialSubmit;
207     }
208
209     @Override
210     public DeviceState getDeviceState() {
211         return deviceState;
212     }
213
214     @Override
215     public ReadOnlyTransaction getReadTransaction() {
216         return dataBroker.newReadOnlyTransaction();
217     }
218
219     @Override
220     public boolean isTransactionsEnabled() {
221         return isInitialTransactionSubmitted.get();
222     }
223
224     @Override
225     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
226                                                           final InstanceIdentifier<T> path,
227                                                           final T data) {
228         if (initialized.get()) {
229             transactionChainManager.writeToTransaction(store, path, data, false);
230         }
231     }
232
233     @Override
234     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
235                                                                          final InstanceIdentifier<T> path,
236                                                                          final T data) {
237         if (initialized.get()) {
238             transactionChainManager.writeToTransaction(store, path, data, true);
239         }
240     }
241
242     @Override
243     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
244                                                           final InstanceIdentifier<T> path) {
245         if (initialized.get()) {
246             transactionChainManager.addDeleteOperationToTxChain(store, path);
247         }
248     }
249
250     @Override
251     public boolean submitTransaction() {
252         return initialized.get() && transactionChainManager.submitTransaction();
253     }
254
255     @Override
256     public ConnectionContext getPrimaryConnectionContext() {
257         return primaryConnectionContext;
258     }
259
260     @Override
261     public DeviceFlowRegistry getDeviceFlowRegistry() {
262         return deviceFlowRegistry;
263     }
264
265     @Override
266     public DeviceGroupRegistry getDeviceGroupRegistry() {
267         return deviceGroupRegistry;
268     }
269
270     @Override
271     public DeviceMeterRegistry getDeviceMeterRegistry() {
272         return deviceMeterRegistry;
273     }
274
275     @Override
276     public void processReply(final OfHeader ofHeader) {
277         messageSpy.spyMessage(
278                 ofHeader.getImplementedInterface(),
279                 ofHeader instanceof Error
280                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
281                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
282     }
283
284     @Override
285     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
286         ofHeaderList.forEach(header -> messageSpy.spyMessage(
287                 header.getImplementedInterface(),
288                 header instanceof Error
289                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
290                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
291     }
292
293     @Override
294     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
295         //1. translate to general flow (table, priority, match, cookie)
296         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
297                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
298
299         if (isFlowRemovedNotificationOn) {
300             // Trigger off a notification
301             notificationPublishService.offerNotification(flowRemovedNotification);
302         }
303     }
304
305     @Override
306     @SuppressWarnings("checkstyle:IllegalCatch")
307     public void processPortStatusMessage(final PortStatusMessage portStatus) {
308         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup
309                 .FROM_SWITCH_PUBLISHED_SUCCESS);
310
311         if (initialized.get()) {
312             try {
313                 writePortStatusMessage(portStatus);
314                 submitTransaction();
315             } catch (final Exception e) {
316                 LOG.warn("Error processing port status message for port {} on device {}",
317                         portStatus.getPortNo(), getDeviceInfo(), e);
318             }
319         } else if (!hasState.get()) {
320             primaryConnectionContext.handlePortStatusMessage(portStatus);
321         }
322     }
323
324     private void writePortStatusMessage(final PortStatus portStatusMessage) {
325         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
326                 .translate(portStatusMessage, getDeviceInfo(), null);
327
328         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
329                 .getNodeInstanceIdentifier()
330                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
331                         .nodeConnectorIdfromDatapathPortNo(
332                                 deviceInfo.getDatapathId(),
333                                 portStatusMessage.getPortNo(),
334                                 OpenflowVersion.get(deviceInfo.getVersion()))));
335
336         if (PortReason.OFPPRADD.equals(portStatusMessage.getReason())
337                 || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
338             // because of ADD status node connector has to be created
339             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
340                     .setKey(iiToNodeConnector.getKey())
341                     .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
342                             FlowCapableNodeConnectorStatisticsDataBuilder().build())
343                     .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
344                     .build());
345         } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
346             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
347         }
348     }
349
350     @Override
351     public void processPacketInMessage(final PacketInMessage packetInMessage) {
352         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
353         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
354     }
355
356     private void handlePacketInMessage(final PacketIn packetIn,
357                                        final Class<?> implementedInterface,
358                                        final Match match) {
359         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
360         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
361
362         if (packetIn == null) {
363             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
364             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
365             return;
366         }
367
368         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
369
370         // Try to get ingress from match
371         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
372                 ? packetIn.getIngress() : Optional.ofNullable(match)
373                 .map(Match::getInPort)
374                 .map(nodeConnectorId -> InventoryDataServiceUtil
375                         .portNumberfromNodeConnectorId(
376                                 openflowVersion,
377                                 nodeConnectorId))
378                 .map(portNumber -> InventoryDataServiceUtil
379                         .nodeConnectorRefFromDatapathIdPortno(
380                                 deviceInfo.getDatapathId(),
381                                 portNumber,
382                                 openflowVersion))
383                 .orElse(null);
384
385         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
386
387         if (!packetInLimiter.acquirePermit()) {
388             LOG.debug("Packet limited");
389             // TODO: save packet into emergency slot if possible
390             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
391                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
392             return;
393         }
394
395         final ListenableFuture<?> offerNotification = notificationPublishService
396                 .offerNotification(new PacketReceivedBuilder(packetIn)
397                         .setIngress(nodeConnectorRef)
398                         .setMatch(MatchUtil.transformMatch(match,
399                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
400                                         .Match.class))
401                         .build());
402
403         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
404             LOG.debug("notification offer rejected");
405             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
406             packetInLimiter.drainLowWaterMark();
407             packetInLimiter.releasePermit();
408             return;
409         }
410
411         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
412             @Override
413             public void onSuccess(final Object result) {
414                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
415                 packetInLimiter.releasePermit();
416             }
417
418             @Override
419             public void onFailure(final Throwable throwable) {
420                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
421                         .FROM_SWITCH_NOTIFICATION_REJECTED);
422                 LOG.debug("notification offer failed: {}", throwable.getMessage());
423                 LOG.trace("notification offer failed..", throwable);
424                 packetInLimiter.releasePermit();
425             }
426         }, MoreExecutors.directExecutor());
427     }
428
429     @Override
430     public void processExperimenterMessage(final ExperimenterMessage notification) {
431         // lookup converter
432         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
433         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
434                 getDeviceInfo().getVersion(),
435                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
436         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
437                 extensionConverterProvider.getMessageConverter(key);
438         if (messageConverter == null) {
439             LOG.warn("custom converter for {}[OF:{}] not found",
440                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
441                     getDeviceInfo().getVersion());
442             return;
443         }
444         // build notification
445         final ExperimenterMessageOfChoice messageOfChoice;
446         try {
447             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
448             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
449                     ExperimenterMessageFromDevBuilder()
450                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
451                     .setExperimenterMessageOfChoice(messageOfChoice);
452             // publish
453             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
454         } catch (final ConversionException e) {
455             LOG.error("Conversion of experimenter notification failed", e);
456         }
457     }
458
459     @Override
460     public boolean processAlienMessage(final OfHeader message) {
461         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
462
463         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
464                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
465             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
466                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
467                     .rev130709.PacketInMessage.class.cast(message);
468
469             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
470             return true;
471         }
472
473         return false;
474     }
475
476     @Override
477     public TranslatorLibrary oook() {
478         return translatorLibrary;
479     }
480
481     @Override
482     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
483         this.notificationPublishService = notificationPublishService;
484     }
485
486     @Override
487     public MessageSpy getMessageSpy() {
488         return messageSpy;
489     }
490
491     @Override
492     public void onPublished() {
493         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
494     }
495
496     @Override
497     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
498                                                                                       requestContext) {
499         return new MultiMsgCollectorImpl<>(this, requestContext);
500     }
501
502     @Override
503     public void updatePacketInRateLimit(final long upperBound) {
504         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
505                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
506     }
507
508     @Override
509     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
510         this.extensionConverterProvider = extensionConverterProvider;
511     }
512
513     @Override
514     public ExtensionConverterProvider getExtensionConverterProvider() {
515         return extensionConverterProvider;
516     }
517
518     @VisibleForTesting
519     TransactionChainManager getTransactionChainManager() {
520         return this.transactionChainManager;
521     }
522
523     @Override
524     public ListenableFuture<Void> closeServiceInstance() {
525         final ListenableFuture<Void> listenableFuture = initialized.get()
526                 ? transactionChainManager.deactivateTransactionManager()
527                 : Futures.immediateFuture(null);
528
529         hashedWheelTimer.newTimeout((timerTask) -> {
530             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
531                 listenableFuture.cancel(true);
532             }
533         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
534
535         return listenableFuture;
536     }
537
538     @Override
539     public DeviceInfo getDeviceInfo() {
540         return this.deviceInfo;
541     }
542
543     @Override
544     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
545         this.contextChainMastershipWatcher = newWatcher;
546     }
547
548     @Nonnull
549     @Override
550     public ServiceGroupIdentifier getIdentifier() {
551         return deviceInfo.getServiceIdentifier();
552     }
553
554     @Override
555     public void close() {
556         // Close all datastore registries and transactions
557         if (initialized.getAndSet(false)) {
558             deviceGroupRegistry.close();
559             deviceFlowRegistry.close();
560             deviceMeterRegistry.close();
561
562             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
563
564             Futures.addCallback(txChainShuttingDown, new FutureCallback<Void>() {
565                 @Override
566                 public void onSuccess(@Nullable final Void result) {
567                     transactionChainManager.close();
568                     transactionChainManager = null;
569                 }
570
571                 @Override
572                 public void onFailure(final Throwable throwable) {
573                     transactionChainManager.close();
574                     transactionChainManager = null;
575                 }
576             }, MoreExecutors.directExecutor());
577         }
578
579         requestContexts.forEach(requestContext -> RequestContextUtil
580                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
581         requestContexts.clear();
582     }
583
584     @Override
585     public boolean canUseSingleLayerSerialization() {
586         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
587     }
588
589     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
590     @Override
591     @SuppressWarnings({"checkstyle:IllegalCatch"})
592     public void instantiateServiceInstance() {
593         lazyTransactionManagerInitialization();
594
595         try {
596             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
597                     .retrieveAndClearPortStatusMessages();
598
599             portStatusMessages.forEach(this::writePortStatusMessage);
600             submitTransaction();
601         } catch (final Exception ex) {
602             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
603                     deviceInfo.toString(), ex.toString()), ex);
604         }
605
606         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
607                 .lookup(deviceInfo.getVersion());
608
609         if (initializer.isPresent()) {
610             final Future<Void> initialize = initializer
611                     .get()
612                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
613
614             try {
615                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
616             } catch (TimeoutException ex) {
617                 initialize.cancel(true);
618                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
619                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
620             } catch (ExecutionException | InterruptedException ex) {
621                 throw new RuntimeException(
622                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
623             }
624         } else {
625             throw new RuntimeException(String.format("Unsupported version %s for device %s",
626                     deviceInfo.getVersion(),
627                     deviceInfo.toString()));
628         }
629
630         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
631                 getDeviceFlowRegistry().fill();
632         Futures.addCallback(deviceFlowRegistryFill,
633                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
634                 MoreExecutors.directExecutor());
635     }
636
637     @VisibleForTesting
638     void lazyTransactionManagerInitialization() {
639         if (!this.initialized.get()) {
640             if (LOG.isDebugEnabled()) {
641                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
642             }
643             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
644             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
645                     deviceInfo.getNodeInstanceIdentifier());
646             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
647             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
648         }
649
650         transactionChainManager.activateTransactionManager();
651         initialized.set(true);
652     }
653
654     @Nullable
655     @Override
656     public <T> RequestContext<T> createRequestContext() {
657         final Long xid = deviceInfo.reserveXidForDeviceMessage();
658
659         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
660             @Override
661             public void close() {
662                 requestContexts.remove(this);
663             }
664         };
665
666         requestContexts.add(abstractRequestContext);
667         return abstractRequestContext;
668     }
669
670     @Override
671     public void onStateAcquired(final ContextChainState state) {
672         hasState.set(true);
673     }
674
675     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base
676             .Optional<FlowCapableNode>>> {
677         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
678         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
679
680         DeviceFlowRegistryCallback(
681                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
682                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
683             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
684             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
685         }
686
687         @Override
688         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
689             if (LOG.isDebugEnabled()) {
690                 // Count all flows we read from datastore for debugging purposes.
691                 // This number do not always represent how many flows were actually added
692                 // to DeviceFlowRegistry, because of possible duplicates.
693                 long flowCount = Optional.ofNullable(result)
694                         .map(Collections::singleton)
695                         .orElse(Collections.emptySet())
696                         .stream()
697                         .flatMap(Collection::stream)
698                         .filter(Objects::nonNull)
699                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
700                         .filter(Objects::nonNull)
701                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
702                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
703                         .filter(Objects::nonNull)
704                         .filter(table -> Objects.nonNull(table.getFlow()))
705                         .flatMap(table -> table.getFlow().stream())
706                         .filter(Objects::nonNull)
707                         .count();
708
709                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
710             }
711             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
712                     .INITIAL_FLOW_REGISTRY_FILL);
713         }
714
715         @Override
716         public void onFailure(Throwable throwable) {
717             if (deviceFlowRegistryFill.isCancelled()) {
718                 if (LOG.isDebugEnabled()) {
719                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
720                 }
721             } else {
722                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
723                          throwable);
724             }
725             contextChainMastershipWatcher.onNotAbleToStartMastership(
726                     deviceInfo,
727                     "Was not able to fill flow registry on device",
728                     false);
729         }
730     }
731
732 }