Change return type of events
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Verify;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.JdkFutureAdapters;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.TimerTask;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicReference;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
43 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
44 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
45 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
46 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
47 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
49 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
50 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
51 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
54 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
57 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
58 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
59 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
60 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
61 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
62 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
63 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
64 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
65 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
66 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
67 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
68 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
69 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
70 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
71 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
72 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
73 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
74 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
75 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
76 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
77 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
78 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
79 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
80 import org.opendaylight.openflowplugin.impl.util.MatchUtil;
81 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
82 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.Match;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketIn;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceivedBuilder;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
116 import org.opendaylight.yangtools.yang.binding.DataContainer;
117 import org.opendaylight.yangtools.yang.binding.DataObject;
118 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
119 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
120 import org.opendaylight.yangtools.yang.common.RpcResult;
121 import org.slf4j.Logger;
122 import org.slf4j.LoggerFactory;
123
124 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
125
126     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
127
128     // TODO: drain factor should be parametrized
129     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
130     // TODO: low water mark factor should be parametrized
131     private static final float LOW_WATERMARK_FACTOR = 0.75f;
132     // TODO: high water mark factor should be parametrized
133     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
134
135     // Timeout in seconds after what we will give up on propagating role
136     private static final int SET_ROLE_TIMEOUT = 10;
137
138     // Timeout in milliseconds after what we will give up on initializing device
139     private static final int DEVICE_INIT_TIMEOUT = 9000;
140
141     // Timeout in milliseconds after what we will give up on closing transaction chain
142     private static final int TX_CHAIN_CLOSE_TIMEOUT = 10000;
143
144     private static final int LOW_WATERMARK = 1000;
145     private static final int HIGH_WATERMARK = 2000;
146
147     private final MultipartWriterProvider writerProvider;
148     private final HashedWheelTimer hashedWheelTimer;
149     private final DeviceState deviceState;
150     private final DataBroker dataBroker;
151     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
152     private final MessageSpy messageSpy;
153     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
154     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
155     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
156     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
157     private final TranslatorLibrary translatorLibrary;
158     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
159     private final ConvertorExecutor convertorExecutor;
160     private final DeviceInitializerProvider deviceInitializerProvider;
161     private final PacketInRateLimiter packetInLimiter;
162     private final DeviceInfo deviceInfo;
163     private final ConnectionContext primaryConnectionContext;
164     private final boolean skipTableFeatures;
165     private final boolean switchFeaturesMandatory;
166     private final boolean isFlowRemovedNotificationOn;
167     private final boolean useSingleLayerSerialization;
168     private final AtomicBoolean initialized = new AtomicBoolean(false);
169     private final AtomicBoolean hasState = new AtomicBoolean(false);
170     private final AtomicBoolean isInitialTransactionSubmitted = new AtomicBoolean(false);
171     private final AtomicReference<ContextState> state = new AtomicReference<>(ContextState.INITIALIZATION);
172     private NotificationPublishService notificationPublishService;
173     private TransactionChainManager transactionChainManager;
174     private DeviceFlowRegistry deviceFlowRegistry;
175     private DeviceGroupRegistry deviceGroupRegistry;
176     private DeviceMeterRegistry deviceMeterRegistry;
177     private ExtensionConverterProvider extensionConverterProvider;
178     private SalRoleService salRoleService;
179     private ContextChainMastershipWatcher contextChainMastershipWatcher;
180
181     DeviceContextImpl(
182             @Nonnull final ConnectionContext primaryConnectionContext,
183             @Nonnull final DataBroker dataBroker,
184             @Nonnull final MessageSpy messageSpy,
185             @Nonnull final TranslatorLibrary translatorLibrary,
186             final ConvertorExecutor convertorExecutor,
187             final boolean skipTableFeatures,
188             final HashedWheelTimer hashedWheelTimer,
189             final boolean useSingleLayerSerialization,
190             final DeviceInitializerProvider deviceInitializerProvider,
191             final boolean isFlowRemovedNotificationOn,
192             final boolean switchFeaturesMandatory) {
193
194         this.primaryConnectionContext = primaryConnectionContext;
195         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
196         this.hashedWheelTimer = hashedWheelTimer;
197         this.deviceInitializerProvider = deviceInitializerProvider;
198         this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
199         this.switchFeaturesMandatory = switchFeaturesMandatory;
200         this.deviceState = new DeviceStateImpl();
201         this.dataBroker = dataBroker;
202         this.messageSpy = messageSpy;
203
204         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
205                 /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
206
207         this.translatorLibrary = translatorLibrary;
208         this.portStatusTranslator = translatorLibrary.lookupTranslator(
209                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
210         this.packetInTranslator = translatorLibrary.lookupTranslator(
211                 new TranslatorKey(deviceInfo.getVersion(), org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
212                         .PacketIn.class.getName()));
213         this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
214                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
215
216         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
217         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
218         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
219         this.convertorExecutor = convertorExecutor;
220         this.skipTableFeatures = skipTableFeatures;
221         this.useSingleLayerSerialization = useSingleLayerSerialization;
222         writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
223     }
224
225     @Override
226     public boolean initialSubmitTransaction() {
227         if (!initialized.get()) {
228             return false;
229         }
230
231         final boolean initialSubmit = transactionChainManager.initialSubmitWriteTransaction();
232         isInitialTransactionSubmitted.set(initialSubmit);
233         return initialSubmit;
234     }
235
236     @Override
237     public DeviceState getDeviceState() {
238         return deviceState;
239     }
240
241     @Override
242     public ReadOnlyTransaction getReadTransaction() {
243         return dataBroker.newReadOnlyTransaction();
244     }
245
246     @Override
247     public boolean isTransactionsEnabled() {
248         return isInitialTransactionSubmitted.get();
249     }
250
251     @Override
252     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
253                                                           final InstanceIdentifier<T> path,
254                                                           final T data){
255         if (initialized.get()) {
256             transactionChainManager.writeToTransaction(store, path, data, false);
257         }
258     }
259
260     @Override
261     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
262                                                                          final InstanceIdentifier<T> path,
263                                                                          final T data){
264         if (initialized.get()) {
265             transactionChainManager.writeToTransaction(store, path, data, true);
266         }
267     }
268
269     @Override
270     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
271         if (initialized.get()) {
272             transactionChainManager.addDeleteOperationTotTxChain(store, path);
273         }
274     }
275
276     @Override
277     public boolean submitTransaction() {
278         return initialized.get() && transactionChainManager.submitWriteTransaction();
279     }
280
281     @Override
282     public ConnectionContext getPrimaryConnectionContext() {
283         return primaryConnectionContext;
284     }
285
286     @Override
287     public DeviceFlowRegistry getDeviceFlowRegistry() {
288         return deviceFlowRegistry;
289     }
290
291     @Override
292     public DeviceGroupRegistry getDeviceGroupRegistry() {
293         return deviceGroupRegistry;
294     }
295
296     @Override
297     public DeviceMeterRegistry getDeviceMeterRegistry() {
298         return deviceMeterRegistry;
299     }
300
301     @Override
302     public void processReply(final OfHeader ofHeader) {
303         messageSpy.spyMessage(
304                 ofHeader.getImplementedInterface(),
305                 (ofHeader instanceof Error)
306                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
307                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
308     }
309
310     @Override
311     public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
312         ofHeaderList.forEach(header -> messageSpy.spyMessage(
313                 header.getImplementedInterface(),
314                 (header instanceof Error)
315                         ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
316                         : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
317     }
318
319     @Override
320     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
321         //1. translate to general flow (table, priority, match, cookie)
322         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
323                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
324
325         if (isFlowRemovedNotificationOn) {
326             // Trigger off a notification
327             notificationPublishService.offerNotification(flowRemovedNotification);
328         }
329
330         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
331         if (itemLifecycleListener != null) {
332             //2. create registry key
333             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
334             //3. lookup flowId
335             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
336             //4. if flowId present:
337             if (flowDescriptor != null) {
338                 // a) construct flow path
339                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
340                         .augmentation(FlowCapableNode.class)
341                         .child(Table.class, flowDescriptor.getTableKey())
342                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
343                 // b) notify listener
344                 itemLifecycleListener.onRemoved(flowPath);
345             } else {
346                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
347                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
348             }
349         }
350     }
351
352     @Override
353     public void processPortStatusMessage(final PortStatusMessage portStatus) {
354         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
355
356         if (initialized.get()) {
357             try {
358                 writePortStatusMessage(portStatus);
359                 submitTransaction();
360             } catch (final Exception e) {
361                 LOG.warn("Error processing port status message for port {} on device {}",
362                         portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
363             }
364         } else if (!hasState.get()) {
365             primaryConnectionContext.handlePortStatusMessage(portStatus);
366         }
367     }
368
369     private void writePortStatusMessage(final PortStatus portStatusMessage) {
370         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
371                 .translate(portStatusMessage, getDeviceInfo(), null);
372
373         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
374                 .getNodeInstanceIdentifier()
375                 .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
376                         .nodeConnectorIdfromDatapathPortNo(
377                                 deviceInfo.getDatapathId(),
378                                 portStatusMessage.getPortNo(),
379                                 OpenflowVersion.get(deviceInfo.getVersion()))));
380
381         if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
382             // because of ADD status node connector has to be created
383             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
384                     .setKey(iiToNodeConnector.getKey())
385                     .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build())
386                     .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
387                     .build());
388         } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
389             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
390         }
391     }
392
393     @Override
394     public void processPacketInMessage(final PacketInMessage packetInMessage) {
395         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
396         handlePacketInMessage(packetReceived, packetInMessage.getImplementedInterface(), packetReceived.getMatch());
397     }
398
399     private void handlePacketInMessage(final PacketIn packetIn,
400                                        final Class<?> implementedInterface,
401                                        final Match match) {
402         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH);
403         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
404
405         if (packetIn == null) {
406             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
407             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
408             return;
409         }
410
411         final OpenflowVersion openflowVersion = OpenflowVersion.get(deviceInfo.getVersion());
412
413         // Try to get ingress from match
414         final NodeConnectorRef nodeConnectorRef = Objects.nonNull(packetIn.getIngress())
415                 ? packetIn.getIngress() : Optional.ofNullable(match)
416                 .map(Match::getInPort)
417                 .map(nodeConnectorId -> InventoryDataServiceUtil
418                         .portNumberfromNodeConnectorId(
419                                 openflowVersion,
420                                 nodeConnectorId))
421                 .map(portNumber -> InventoryDataServiceUtil
422                         .nodeConnectorRefFromDatapathIdPortno(
423                                 deviceInfo.getDatapathId(),
424                                 portNumber,
425                                 openflowVersion))
426                 .orElse(null);
427
428         messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
429
430         if (!packetInLimiter.acquirePermit()) {
431             LOG.debug("Packet limited");
432             // TODO: save packet into emergency slot if possible
433             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
434             return;
435         }
436
437         final ListenableFuture<?> offerNotification = notificationPublishService
438                 .offerNotification(new PacketReceivedBuilder(packetIn)
439                         .setIngress(nodeConnectorRef)
440                         .setMatch(MatchUtil.transformMatch(match,
441                                 org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.packet.received
442                                         .Match.class))
443                         .build());
444
445         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
446             LOG.debug("notification offer rejected");
447             messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
448             packetInLimiter.drainLowWaterMark();
449             packetInLimiter.releasePermit();
450             return;
451         }
452
453         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
454             @Override
455             public void onSuccess(final Object result) {
456                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
457                 packetInLimiter.releasePermit();
458             }
459
460             @Override
461             public void onFailure(final Throwable t) {
462                 messageSpy.spyMessage(implementedInterface, MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
463                 LOG.debug("notification offer failed: {}", t.getMessage());
464                 LOG.trace("notification offer failed..", t);
465                 packetInLimiter.releasePermit();
466             }
467         });
468     }
469
470     @Override
471     public void processExperimenterMessage(final ExperimenterMessage notification) {
472         // lookup converter
473         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
474         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
475                 getDeviceInfo().getVersion(),
476                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
477         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
478         if (messageConverter == null) {
479             LOG.warn("custom converter for {}[OF:{}] not found",
480                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
481                     getDeviceInfo().getVersion());
482             return;
483         }
484         // build notification
485         final ExperimenterMessageOfChoice messageOfChoice;
486         try {
487             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
488             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
489                     .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
490                     .setExperimenterMessageOfChoice(messageOfChoice);
491             // publish
492             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
493         } catch (final ConversionException e) {
494             LOG.error("Conversion of experimenter notification failed", e);
495         }
496     }
497
498     @Override
499     public boolean processAlienMessage(final OfHeader message) {
500         final Class<? extends DataContainer> implementedInterface = message.getImplementedInterface();
501
502         if (Objects.nonNull(implementedInterface) && org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
503                 .rev130709.PacketInMessage.class.equals(implementedInterface)) {
504             final org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709
505                     .PacketInMessage packetInMessage = org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service
506                     .rev130709.PacketInMessage.class.cast(message);
507
508             handlePacketInMessage(packetInMessage, implementedInterface, packetInMessage.getMatch());
509             return true;
510         }
511
512         return false;
513     }
514
515     @Override
516     public TranslatorLibrary oook() {
517         return translatorLibrary;
518     }
519
520     @Override
521     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
522         this.notificationPublishService = notificationPublishService;
523     }
524
525     @Override
526     public MessageSpy getMessageSpy() {
527         return messageSpy;
528     }
529
530     @Override
531     public void onPublished() {
532         Verify.verify(ContextState.INITIALIZATION.equals(state.get()));
533         state.set(ContextState.WORKING);
534         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
535     }
536
537     @Override
538     public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>> requestContext) {
539         return new MultiMsgCollectorImpl<>(this, requestContext);
540     }
541
542     @Override
543     public void updatePacketInRateLimit(final long upperBound) {
544         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
545     }
546
547     @Override
548     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
549         return itemLifeCycleSourceRegistry;
550     }
551
552     @Override
553     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
554         this.extensionConverterProvider = extensionConverterProvider;
555     }
556
557     @Override
558     public ExtensionConverterProvider getExtensionConverterProvider() {
559         return extensionConverterProvider;
560     }
561
562     @VisibleForTesting
563     TransactionChainManager getTransactionChainManager() {
564         return this.transactionChainManager;
565     }
566
567     @Override
568     public ListenableFuture<Void> closeServiceInstance() {
569         LOG.info("Stopping device context cluster services for node {}", deviceInfo.getLOGValue());
570
571         final ListenableFuture<Void> listenableFuture = initialized.get()
572                 ? transactionChainManager.deactivateTransactionManager()
573                 : Futures.immediateFuture(null);
574
575         hashedWheelTimer.newTimeout((t) -> {
576             if (!listenableFuture.isDone() && !listenableFuture.isCancelled()) {
577                 listenableFuture.cancel(true);
578             }
579         }, TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
580
581         return listenableFuture;
582     }
583
584     @Override
585     public DeviceInfo getDeviceInfo() {
586         return this.deviceInfo;
587     }
588
589     @Override
590     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
591         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
592     }
593
594     @Nonnull
595     @Override
596     public ServiceGroupIdentifier getIdentifier() {
597         return deviceInfo.getServiceIdentifier();
598     }
599
600     @Override
601     public void close() {
602         if (ContextState.TERMINATION.equals(state)) {
603             if (LOG.isDebugEnabled()) {
604                 LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo());
605             }
606
607             return;
608         }
609
610         state.set(ContextState.TERMINATION);
611
612         // Close all datastore registries and transactions
613         if (initialized.get()) {
614             initialized.set(false);
615             deviceGroupRegistry.close();
616             deviceFlowRegistry.close();
617             deviceMeterRegistry.close();
618
619             final ListenableFuture<Void> txChainShuttingDown = transactionChainManager.shuttingDown();
620
621             try {
622                 txChainShuttingDown.get(TX_CHAIN_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
623             } catch (InterruptedException | ExecutionException | TimeoutException e) {
624                 txChainShuttingDown.cancel(true);
625                 LOG.warn("Failed to shut down transaction chain for device {}: {}", deviceInfo, e);
626             }
627
628             transactionChainManager.close();
629             transactionChainManager = null;
630         }
631
632         requestContexts.forEach(requestContext -> RequestContextUtil
633                 .closeRequestContextWithRpcError(requestContext, "Connection closed."));
634         requestContexts.clear();
635     }
636
637     @Override
638     public boolean canUseSingleLayerSerialization() {
639         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
640     }
641
642     @Override
643     public void setSalRoleService(@Nonnull SalRoleService salRoleService) {
644         this.salRoleService = salRoleService;
645     }
646
647     @Override
648     public void instantiateServiceInstance() {
649         LOG.info("Starting device context cluster services for node {}", deviceInfo);
650         lazyTransactionManagerInitialization();
651
652         try {
653             final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
654                     .retrieveAndClearPortStatusMessages();
655
656             portStatusMessages.forEach(this::writePortStatusMessage);
657             submitTransaction();
658         } catch (final Exception ex) {
659             throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
660                     deviceInfo.toString(),
661                     ex.toString()));
662         }
663
664         final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
665                 .lookup(deviceInfo.getVersion());
666
667         if (initializer.isPresent()) {
668             final Future<Void> initialize = initializer
669                     .get()
670                     .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor);
671
672             try {
673                 initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
674             } catch (TimeoutException ex) {
675                 initialize.cancel(true);
676                 throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
677                         deviceInfo.toString(),
678                         String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
679                         ex.toString()));
680             } catch (ExecutionException | InterruptedException ex) {
681                 throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
682                         deviceInfo.toString(),
683                         ex.toString()));
684             }
685         } else {
686             throw new RuntimeException(String.format("Unsupported version %s for device %s",
687                     deviceInfo.getVersion(),
688                     deviceInfo.toString()));
689         }
690
691         Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
692                 new RpcResultFutureCallback(contextChainMastershipWatcher));
693
694         final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
695         Futures.addCallback(deviceFlowRegistryFill,
696                 new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
697     }
698
699     @VisibleForTesting
700     void lazyTransactionManagerInitialization() {
701         if (!this.initialized.get()) {
702             if (LOG.isDebugEnabled()) {
703                 LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
704             }
705             this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
706             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
707             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
708             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
709         }
710
711         transactionChainManager.activateTransactionManager();
712         initialized.set(true);
713     }
714
715     @Nullable
716     @Override
717     public <T> RequestContext<T> createRequestContext() {
718         final Long xid = deviceInfo.reserveXidForDeviceMessage();
719
720         final AbstractRequestContext<T> abstractRequestContext = new AbstractRequestContext<T>(xid) {
721             @Override
722             public void close() {
723                 requestContexts.remove(this);
724             }
725         };
726
727         requestContexts.add(abstractRequestContext);
728         return abstractRequestContext;
729     }
730
731     private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
732         if (LOG.isDebugEnabled()) {
733             LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
734         }
735
736         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
737
738         if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
739             final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
740                     .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
741
742             setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
743
744             final TimerTask timerTask = timeout -> {
745                 if (!setRoleOutputFuture.isDone()) {
746                     LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
747                     setRoleOutputFuture.cancel(true);
748                 }
749             };
750
751             hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
752         } else {
753             LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
754             return Futures.immediateFuture(null);
755         }
756
757         return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
758     }
759
760     @Override
761     public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
762         return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
763     }
764
765     @Override
766     public void onStateAcquired(final ContextChainState state) {
767         hasState.set(true);
768     }
769
770     private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
771
772         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
773
774         RpcResultFutureCallback(final ContextChainMastershipWatcher contextChainMastershipWatcher) {
775             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
776         }
777
778         @Override
779         public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
780             this.contextChainMastershipWatcher.onMasterRoleAcquired(
781                     deviceInfo,
782                     ContextChainMastershipState.MASTER_ON_DEVICE
783             );
784             if (LOG.isDebugEnabled()) {
785                 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo);
786             }
787         }
788
789         @Override
790         public void onFailure(final Throwable throwable) {
791             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
792                     deviceInfo,
793                     "Was not able to set MASTER role on device");
794         }
795     }
796
797     private class DeviceFlowRegistryCallback implements FutureCallback<List<com.google.common.base.Optional<FlowCapableNode>>> {
798         private final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill;
799         private final ContextChainMastershipWatcher contextChainMastershipWatcher;
800
801         DeviceFlowRegistryCallback(
802                 ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill,
803                 ContextChainMastershipWatcher contextChainMastershipWatcher) {
804             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
805             this.contextChainMastershipWatcher = contextChainMastershipWatcher;
806         }
807
808         @Override
809         public void onSuccess(@Nullable List<com.google.common.base.Optional<FlowCapableNode>> result) {
810             if (LOG.isDebugEnabled()) {
811                 // Count all flows we read from datastore for debugging purposes.
812                 // This number do not always represent how many flows were actually added
813                 // to DeviceFlowRegistry, because of possible duplicates.
814                 long flowCount = Optional.ofNullable(result)
815                         .map(Collections::singleton)
816                         .orElse(Collections.emptySet())
817                         .stream()
818                         .flatMap(Collection::stream)
819                         .filter(Objects::nonNull)
820                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
821                         .filter(Objects::nonNull)
822                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
823                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
824                         .filter(Objects::nonNull)
825                         .filter(table -> Objects.nonNull(table.getFlow()))
826                         .flatMap(table -> table.getFlow().stream())
827                         .filter(Objects::nonNull)
828                         .count();
829
830                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
831             }
832             this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
833         }
834
835         @Override
836         public void onFailure(Throwable t) {
837             if (deviceFlowRegistryFill.isCancelled()) {
838                 if (LOG.isDebugEnabled()) {
839                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
840                 }
841             } else {
842                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
843             }
844             contextChainMastershipWatcher.onNotAbleToStartMastership(
845                     deviceInfo,
846                     "Was not able to fill flow registry on device",
847                     false);
848         }
849     }
850
851 }