b9ba54865411f0530962a6718973151d43fea283
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import io.netty.util.HashedWheelTimer;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.Optional;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.eclipse.jdt.annotation.Nullable;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
34 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
35 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
41 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
42 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
43 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
44 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
45 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
46 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
47 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
48 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
57 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
58 import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
59 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
60 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
61 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
62 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
63 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
64 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
65 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
66 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
67 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
68 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
69 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
70 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
71 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
72 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
73 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
74 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
75 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
101 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
102 import org.opendaylight.yangtools.yang.binding.DataContainer;
103 import org.opendaylight.yangtools.yang.binding.DataObject;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108
109 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper, DeviceInitializationContext {
110
111     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
112     private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
113
114     // TODO: drain factor should be parametrized
115     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
116     // TODO: low water mark factor should be parametrized
117     private static final float LOW_WATERMARK_FACTOR = 0.75f;
118     // TODO: high water mark factor should be parametrized
119     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
120
121     // Timeout in milliseconds after what we will give up on initializing device
122     private static final int DEVICE_INIT_TIMEOUT = 9000;
123
124     // Timeout in milliseconds after what we will give up on closing transaction chain
125     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
126
127     private static final int LOW_WATERMARK = 1000;
128     private static final int HIGH_WATERMARK = 2000;
129
130     private final MultipartWriterProvider writerProvider;
131     private final HashedWheelTimer hashedWheelTimer;
132     private final DeviceState deviceState;
133     private final DataBroker dataBroker;
134     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
135     private final MessageSpy messageSpy;
136     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
137     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
138     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
139             .flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
140     private final TranslatorLibrary translatorLibrary;
141     private final ConvertorExecutor convertorExecutor;
142     private final DeviceInitializerProvider deviceInitializerProvider;
143     private final PacketInRateLimiter packetInLimiter;
144     private final DeviceInfo deviceInfo;
145     private final ConnectionContext primaryConnectionContext;
146     private final boolean skipTableFeatures;
147     private final boolean switchFeaturesMandatory;
148     private final boolean isFlowRemovedNotificationOn;
149     private final boolean useSingleLayerSerialization;
150     private final AtomicBoolean initialized = new AtomicBoolean(false);
151     private final AtomicBoolean hasState = new AtomicBoolean(false);
152     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
153     private final ContextChainHolder contextChainHolder;
154     private NotificationPublishService notificationPublishService;
155     private TransactionChainManager transactionChainManager;
156     private DeviceFlowRegistry deviceFlowRegistry;
157     private DeviceGroupRegistry deviceGroupRegistry;
158     private DeviceMeterRegistry deviceMeterRegistry;
159     private ExtensionConverterProvider extensionConverterProvider;
160     private ContextChainMastershipWatcher contextChainMastershipWatcher;
161     private final NotificationManager<String, Runnable> queuedNotificationManager;
162     private final boolean isStatisticsPollingOn;
163
164     DeviceContextImpl(@NonNull final ConnectionContext primaryConnectionContext,
165                       @NonNull final DataBroker dataBroker,
166                       @NonNull final MessageSpy messageSpy,
167                       @NonNull final TranslatorLibrary translatorLibrary,
168                       final ConvertorExecutor convertorExecutor,
169                       final boolean skipTableFeatures,
170                       final HashedWheelTimer hashedWheelTimer,
171                       final boolean useSingleLayerSerialization,
172                       final DeviceInitializerProvider deviceInitializerProvider,
173                       final boolean isFlowRemovedNotificationOn,
174                       final boolean switchFeaturesMandatory,
175                       final ContextChainHolder contextChainHolder,
176                       final NotificationManager queuedNotificationManager,
177                       final boolean isStatisticsPollingOn) {
178         this.primaryConnectionContext = primaryConnectionContext;
179         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
180         this.hashedWheelTimer = hashedWheelTimer;
181         this.deviceInitializerProvider = deviceInitializerProvider;
182         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
183         this.switchFeaturesMandatory = switchFeaturesMandatory;
184         this.deviceState = new DeviceStateImpl();
185         this.dataBroker = dataBroker;
186         this.messageSpy = messageSpy;
187         this.isStatisticsPollingOn = isStatisticsPollingOn;
188         this.contextChainHolder = contextChainHolder;
189
190         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
191                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
192
193         this.translatorLibrary = translatorLibrary;
194         this.portStatusTranslator = translatorLibrary.lookupTranslator(
195                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
196         this.packetInTranslator = translatorLibrary.lookupTranslator(
197                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow
198                         .protocol.rev130731
199                         .PacketIn.class.getName()));
200         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
201                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
202
203         this.convertorExecutor = convertorExecutor;
204         this.skipTableFeatures = skipTableFeatures;
205         this.useSingleLayerSerialization = useSingleLayerSerialization;
206         this.queuedNotificationManager = queuedNotificationManager;
207         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
208     }
209
210     @Override
211     public boolean initialSubmitTransaction() {
212         if (!initialized.get()) {
213             return false;
214         }
215
216         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
217         isInitialTransactionSubmitted.set(initialSubmit);
218         return initialSubmit;
219     }
220
221     @Override
222     public DeviceState getDeviceState() {
223         return deviceState;
224     }
225
226     @Override
227     public ReadTransaction getReadTransaction() {
228         return dataBroker.newReadOnlyTransaction();
229     }
230
231     @Override
232     public boolean isTransactionsEnabled() {
233         return isInitialTransactionSubmitted.get();
234     }
235
236     @Override
237     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
238                                                           final InstanceIdentifier<T> path,
239                                                           final T data) {
240         if (initialized.get()) {
241             transactionChainManager.writeToTransaction(store, path, data, false);
242         }
243     }
244
245     @Override
246     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
247                                                                          final InstanceIdentifier<T> path,
248                                                                          final T data) {
249         if (initialized.get()) {
250             transactionChainManager.writeToTransaction(store, path, data, true);
251         }
252     }
253
254     @Override
255     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
256                                                           final InstanceIdentifier<T> path) {
257         if (initialized.get()) {
258             transactionChainManager.addDeleteOperationToTxChain(store, path);
259         }
260     }
261
262     @Override
263     public boolean submitTransaction() {
264         return initialized.get() && transactionChainManager.submitTransaction();
265     }
266
267     @Override
268     public boolean syncSubmitTransaction() {
269         return initialized.get() && transactionChainManager.submitTransaction(true);
270     }
271
272     @Override
273     public ConnectionContext getPrimaryConnectionContext() {
274         return primaryConnectionContext;
275     }
276
277     @Override
278     public DeviceFlowRegistry getDeviceFlowRegistry() {
279         return deviceFlowRegistry;
280     }
281
282     @Override
283     public DeviceGroupRegistry getDeviceGroupRegistry() {
284         return deviceGroupRegistry;
285     }
286
287     @Override
288     public boolean isStatisticsPollingOn() {
289         return isStatisticsPollingOn;
290     }
291
292     @Override
293     public DeviceMeterRegistry getDeviceMeterRegistry() {
294         return deviceMeterRegistry;
295     }
296
297     @Override
298     public void processReply(final OfHeader ofHeader) {
299         messageSpy.spyMessage(
300                 ofHeader.implementedInterface(),
301                 ofHeader instanceof Error
302                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
303                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
304     }
305
306     @Override
307     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
308         ofHeaderList.forEach(header -> messageSpy.spyMessage(
309                 header.implementedInterface(),
310                 header instanceof Error
311                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
312                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
313     }
314
315     @Override
316     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
317         if (isMasterOfDevice()) {
318             //1. translate to general flow (table, priority, match, cookie)
319             final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819
320                     .FlowRemoved flowRemovedNotification = flowRemovedTranslator
321                     .translate(flowRemoved, deviceInfo, null);
322
323             if (isFlowRemovedNotificationOn) {
324                 // Trigger off a notification
325                 notificationPublishService.offerNotification(flowRemovedNotification);
326             }
327         } else {
328             LOG.debug("Controller is not owner of the device {}, skipping Flow Removed message",
329                     deviceInfo.getLOGValue());
330         }
331     }
332
333     @Override
334     @SuppressWarnings("checkstyle:IllegalCatch")
335     public void processPortStatusMessage(final PortStatusMessage portStatus) {
336         messageSpy.spyMessage(portStatus.implementedInterface(), MessageSpy.StatisticsGroup
337                 .FROM_SWITCH_PUBLISHED_SUCCESS);
338
339         if (initialized.get()) {
340             try {
341                 writePortStatusMessage(portStatus);
342             } catch (final Exception e) {
343                 LOG.warn("Error processing port status message for port {} on device {}",
344                         portStatus.getPortNo(), getDeviceInfo(), e);
345             }
346         } else if (!hasState.get()) {
347             primaryConnectionContext.handlePortStatusMessage(portStatus);
348         }
349     }
350
351     @SuppressWarnings("checkstyle:IllegalCatch")
352     private void writePortStatusMessage(final PortStatus portStatusMessage) {
353         String datapathId = deviceInfo.getDatapathId().toString().intern();
354         queuedNotificationManager.submitNotification(datapathId, () -> {
355             try {
356                 acquireWriteTransactionLock();
357                 final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
358                         .translate(portStatusMessage, getDeviceInfo(), null);
359                 OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
360                         deviceInfo.getDatapathId(), portStatusMessage.getPortNo(), portStatusMessage.getName(),
361                         portStatusMessage.getReason());
362
363                 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
364                         .getNodeInstanceIdentifier()
365                         .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
366                                 .nodeConnectorIdfromDatapathPortNo(
367                                         deviceInfo.getDatapathId(),
368                                         portStatusMessage.getPortNo(),
369                                         OpenflowVersion.get(deviceInfo.getVersion()))));
370
371                 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
372                         .withKey(iiToNodeConnector.getKey())
373                         .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new
374                                 FlowCapableNodeConnectorStatisticsDataBuilder().build())
375                         .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
376                         .build());
377                 syncSubmitTransaction();
378                 if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
379                     addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
380                     syncSubmitTransaction();
381                 }
382             } catch (final Exception e) {
383                 LOG.warn("Error processing port status message for port {} on device {}",
384                         portStatusMessage.getPortNo(), datapathId, e);
385             } finally {
386                 releaseWriteTransactionLock();
387             }
388         });
389     }
390
391     @Override
392     public void processPacketInMessage(final PacketInMessage packetInMessage) {
393         if (isMasterOfDevice()) {
394             final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
395             handlePacketInMessage(packetReceived, packetInMessage.implementedInterface(), packetReceived.getMatch());
396         } else {
397             LOG.debug("Controller is not owner of the device {}, skipping packet_in message", deviceInfo.getLOGValue());
398         }
399     }
400
401     private Boolean isMasterOfDevice() {
402         final ContextChain contextChain = contextChainHolder.getContextChain(deviceInfo);
403         boolean result = false;
404         if (contextChain != null) {
405             result = contextChain.isMastered(ContextChainMastershipState.CHECK, false);
406         }
407         return result;
408     }
409
410     private void handlePacketInMessage(final PacketIn packetIn,
411                                        final Class<?> implementedInterface,
412                                        final Match match) {
413         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
414         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
415
416         if (packetIn == null) {
417             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
418             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
419             return;
420         }
421
422         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
423
424         // Try to get ingress from match
425         final NodeConnectorRef nodeConnectorRef = packetIn.getIngress() != null
426                 ? packetIn.getIngress() : Optional.ofNullable(match)
427                 .map(Match::getInPort)
428                 .map(nodeConnectorId -> InventoryDataServiceUtil
429                         .portNumberfromNodeConnectorId(
430                                 openflowVersion,
431                                 nodeConnectorId))
432                 .map(portNumber -> InventoryDataServiceUtil
433                         .nodeConnectorRefFromDatapathIdPortno(
434                                 deviceInfo.getDatapathId(),
435                                 portNumber,
436                                 openflowVersion))
437                 .orElse(null);
438
439         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
440
441         if (!packetInLimiter.acquirePermit()) {
442             LOG.debug("Packet limited");
443             // TODO: save packet into emergency slot if possible
444             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
445                     .FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
446             return;
447         }
448
449         final ListenableFuture<?> offerNotification = notificationPublishService
450                 .offerNotification(new PacketReceivedBuilder(packetIn)
451                         .setIngress(nodeConnectorRef)
452                         .setMatch(MatchUtil.transformMatch(match,
453                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
454                                         .Match.class))
455                         .build());
456
457         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
458             LOG.debug("notification offer rejected");
459             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
460             packetInLimiter.drainLowWaterMark();
461             packetInLimiter.releasePermit();
462             return;
463         }
464
465         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
466             @Override
467             public void onSuccess(final Object result) {
468                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
469                 packetInLimiter.releasePermit();
470             }
471
472             @Override
473             public void onFailure(final Throwable throwable) {
474                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup
475                         .FROM_SWITCH_NOTIFICATION_REJECTED);
476                 LOG.debug("notification offer failed: {}", throwable.getMessage());
477                 LOG.trace("notification offer failed..", throwable);
478                 packetInLimiter.releasePermit();
479             }
480         }, MoreExecutors.directExecutor());
481     }
482
483     @Override
484     public void processExperimenterMessage(final ExperimenterMessage notification) {
485         if (isMasterOfDevice()) {
486             // lookup converter
487             final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
488             final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
489                     getDeviceInfo().getVersion(),
490                     (Class<? extends ExperimenterDataOfChoice>) vendorData.implementedInterface());
491             final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter =
492                     extensionConverterProvider.getMessageConverter(key);
493             if (messageConverter == null) {
494                 LOG.warn("custom converter for {}[OF:{}] not found",
495                         notification.getExperimenterDataOfChoice().implementedInterface(),
496                         getDeviceInfo().getVersion());
497                 return;
498             }
499             // build notification
500             final ExperimenterMessageOfChoice messageOfChoice;
501             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
502             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new
503                     ExperimenterMessageFromDevBuilder()
504                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
505                     .setExperimenterMessageOfChoice(messageOfChoice);
506             // publish
507             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
508         } else {
509             LOG.debug("Controller is not owner of the device {}, skipping experimenter message",
510                     deviceInfo.getLOGValue());
511         }
512     }
513
514     @Override
515     // The cast to PacketInMessage is safe as the implemented interface is verified before the cas tbut FB doesn't
516     // recognize it.
517     @SuppressFBWarnings("BC_UNCONFIRMED_CAST")
518     public boolean processAlienMessage(final OfHeader message) {
519         final Class<? extends DataContainer> implementedInterface = message.implementedInterface();
520
521         if (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketInMessage.class
522                 .equals(implementedInterface)) {
523             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
524                     .PacketInMessage packetInMessage = (org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
525                 .rev130709.PacketInMessage) message;
526
527             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
528             return true;
529         }
530
531         return false;
532     }
533
534     @Override
535     public TranslatorLibrary oook() {
536         return translatorLibrary;
537     }
538
539     @Override
540     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
541         this.notificationPublishService = notificationPublishService;
542     }
543
544     @Override
545     public MessageSpy getMessageSpy() {
546         return messageSpy;
547     }
548
549     @Override
550     public void onPublished() {
551         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
552     }
553
554     @Override
555     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>>
556                                                                                       requestContext) {
557         return new MultiMsgCollectorImpl<>(this, requestContext);
558     }
559
560     @Override
561     public void updatePacketInRateLimit(final long upperBound) {
562         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound),
563                 (int) (HIGH_WATERMARK_FACTOR * upperBound));
564     }
565
566     @Override
567     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
568         this.extensionConverterProvider = extensionConverterProvider;
569     }
570
571     @Override
572     public ExtensionConverterProvider getExtensionConverterProvider() {
573         return extensionConverterProvider;
574     }
575
576     @VisibleForTesting
577     TransactionChainManager getTransactionChainManager() {
578         return this.transactionChainManager;
579     }
580
581     @Override
582     public ListenableFuture<?> closeServiceInstance() {
583         final ListenableFuture<?> listenableFuture = initialized.get()
584                 ? transactionChainManager.deactivateTransactionManager()
585                 : Futures.immediateFuture(null);
586
587         hashedWheelTimer.newTimeout((timerTask) -> {
588             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
589                 listenableFuture.cancel(true);
590             }
591         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
592
593         return listenableFuture;
594     }
595
596     @Override
597     public DeviceInfo getDeviceInfo() {
598         return this.deviceInfo;
599     }
600
601     @Override
602     public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
603         this.contextChainMastershipWatcher = newWatcher;
604     }
605
606     @NonNull
607     @Override
608     public ServiceGroupIdentifier getIdentifier() {
609         return deviceInfo.getServiceIdentifier();
610     }
611
612     @Override
613     public void acquireWriteTransactionLock() {
614         transactionChainManager.acquireWriteTransactionLock();
615     }
616
617     @Override
618     public void releaseWriteTransactionLock() {
619         transactionChainManager.releaseWriteTransactionLock();
620     }
621
622     @Override
623     public void close() {
624         // Close all datastore registries and transactions
625         if (initialized.getAndSet(false)) {
626             deviceGroupRegistry.close();
627             deviceFlowRegistry.close();
628             deviceMeterRegistry.close();
629
630             final ListenableFuture<?> txChainShuttingDown = transactionChainManager.shuttingDown();
631
632             Futures.addCallback(txChainShuttingDown, new FutureCallback<Object>() {
633                 @Override
634                 public void onSuccess(final Object result) {
635                     transactionChainManager.close();
636                     transactionChainManager = null;
637                 }
638
639                 @Override
640                 public void onFailure(final Throwable throwable) {
641                     transactionChainManager.close();
642                     transactionChainManager = null;
643                 }
644             }, MoreExecutors.directExecutor());
645         }
646
647         requestContexts.forEach(requestContext -> RequestContextUtil
648                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
649         requestContexts.clear();
650     }
651
652     @Override
653     public boolean canUseSingleLayerSerialization() {
654         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
655     }
656
657     @Override
658     public void instantiateServiceInstance() {
659         lazyTransactionManagerInitialization();
660     }
661
662     // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
663     @Override
664     @SuppressWarnings({"checkstyle:IllegalCatch"})
665     public void initializeDevice() {
666         LOG.debug("Device initialization started for device {}", deviceInfo);
667         try {
668             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
669                     .retrieveAndClearPortStatusMessages();
670             portStatusMessages.forEach(this::writePortStatusMessage);
671             submitTransaction();
672         } catch (final Exception ex) {
673             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
674                     deviceInfo.toString(), ex.toString()), ex);
675         }
676
677         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
678                 .lookup(deviceInfo.getVersion());
679
680         if (initializer.isPresent()) {
681             final Future<Void> initialize = initializer
682                     .get()
683                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
684
685             try {
686                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
687             } catch (TimeoutException ex) {
688                 initialize.cancel(true);
689                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
690                         deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
691             } catch (ExecutionException | InterruptedException ex) {
692                 throw new RuntimeException(
693                         String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
694             }
695         } else {
696             throw new RuntimeException(String.format("Unsupported version %s for device %s",
697                     deviceInfo.getVersion(),
698                     deviceInfo.toString()));
699         }
700
701         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
702                 getDeviceFlowRegistry().fill();
703         Futures.addCallback(deviceFlowRegistryFill,
704                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
705                 MoreExecutors.directExecutor());
706     }
707
708     @VisibleForTesting
709     void lazyTransactionManagerInitialization() {
710         if (!this.initialized.get()) {
711             if (LOG.isDebugEnabled()) {
712                 LOG.debug("Transaction chain manager for node {} created", deviceInfo);
713             }
714             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
715             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
716                     deviceInfo.getNodeInstanceIdentifier());
717             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
718             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
719         }
720
721         transactionChainManager.activateTransactionManager();
722         initialized.set(true);
723     }
724
725     @Nullable
726     @Override
727     public <T> RequestContext<T> createRequestContext() {
728         final Long xid = deviceInfo.reserveXidForDeviceMessage();
729
730         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
731             @Override
732             public void close() {
733                 requestContexts.remove(this);
734             }
735         };
736
737         requestContexts.add(abstractRequestContext);
738         return abstractRequestContext;
739     }
740
741     @Override
742     public void onStateAcquired(final ContextChainState state) {
743         hasState.set(true);
744     }
745
746     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
747         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
748         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
749
750         DeviceFlowRegistryCallback(
751                 ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
752                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
753             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
754             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
755         }
756
757         @Override
758         public void onSuccess(List<Optional<FlowCapableNode>> result) {
759             if (LOG.isDebugEnabled()) {
760                 // Count all flows we read from datastore for debugging purposes.
761                 // This number do not always represent how many flows were actually added
762                 // to DeviceFlowRegistry, because of possible duplicates.
763                 long flowCount = 0;
764                 if (result != null) {
765                     for (Optional<FlowCapableNode> optNode : result) {
766                         if (optNode.isPresent()) {
767                             flowCount += optNode.get().nonnullTable().stream()
768                                     .filter(Objects::nonNull)
769                                     .flatMap(table -> table.nonnullFlow().stream())
770                                     .filter(Objects::nonNull)
771                                     .count();
772                         }
773                     }
774                 }
775
776                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
777             }
778         }
779
780         @Override
781         public void onFailure(Throwable throwable) {
782             if (deviceFlowRegistryFill.isCancelled()) {
783                 if (LOG.isDebugEnabled()) {
784                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
785                 }
786             } else {
787                 LOG.warn("Failed filling flow registry with flows for node: {}", deviceInfo, throwable);
788             }
789             contextChainMastershipWatcher.onNotAbleToStartMastership(
790                     deviceInfo,
791                     "Was not able to fill flow registry on device",
792                     false);
793         }
794     }
795
796 }