Bug 5596 Cleaning part 1
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import io.netty.util.Timeout;
17 import java.math.BigInteger;
18 import java.util.HashMap;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.concurrent.ExecutionException;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
25 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
30 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
31 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
38 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
39 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
40 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
41 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
42 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
44 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
45 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
46 import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
47 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
48 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
49 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
50 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
51 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
52 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
53 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
54 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
55 import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
56 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
57 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
58 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
59 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
60 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
61 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
62 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
63 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
64 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
65 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
66 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
67 import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
68 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
69 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
97 import org.opendaylight.yangtools.yang.binding.DataObject;
98 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
99 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
102
103 /**
104  *
105  */
106 public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{
107
108     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
109
110     // TODO: drain factor should be parametrized
111     private static final float REJECTED_DRAIN_FACTOR = 0.25f;
112     // TODO: low water mark factor should be parametrized
113     private static final float LOW_WATERMARK_FACTOR = 0.75f;
114     // TODO: high water mark factor should be parametrized
115     private static final float HIGH_WATERMARK_FACTOR = 0.95f;
116
117     private final ConnectionContext primaryConnectionContext;
118     private final DeviceState deviceState;
119     private final DataBroker dataBroker;
120     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
121     private final TransactionChainManager transactionChainManager;
122     private final DeviceFlowRegistry deviceFlowRegistry;
123     private final DeviceGroupRegistry deviceGroupRegistry;
124     private final DeviceMeterRegistry deviceMeterRegistry;
125     private final PacketInRateLimiter packetInLimiter;
126     private final MessageSpy messageSpy;
127     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
128     private NotificationPublishService notificationPublishService;
129     private final OutboundQueue outboundQueueProvider;
130     private Timeout barrierTaskTimeout;
131     private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
132     private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
133     private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
134     private final TranslatorLibrary translatorLibrary;
135     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
136     private ExtensionConverterProvider extensionConverterProvider;
137     private final DeviceManager deviceManager;
138
139     private boolean switchFeaturesMandatory;
140     private final DeviceInfo deviceInfo;
141     private final ConvertorExecutor convertorExecutor;
142     private volatile CONTEXT_STATE state;
143
144     public DeviceContextImpl(
145             @Nonnull final ConnectionContext primaryConnectionContext,
146             @Nonnull final DataBroker dataBroker,
147             @Nonnull final LifecycleConductor conductor,
148             @Nonnull final OutboundQueueProvider outboundQueueProvider,
149             @Nonnull final TranslatorLibrary translatorLibrary,
150             @Nonnull final DeviceManager manager,
151             final ConvertorExecutor convertorExecutor) {
152         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
153         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
154         this.deviceState = new DeviceStateImpl();
155         this.dataBroker = Preconditions.checkNotNull(dataBroker);
156         Preconditions.checkNotNull(conductor);
157         this.outboundQueueProvider = Preconditions.checkNotNull(outboundQueueProvider);
158         this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo , conductor);
159         auxiliaryConnectionContexts = new HashMap<>();
160         deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
161         deviceGroupRegistry = new DeviceGroupRegistryImpl();
162         deviceMeterRegistry = new DeviceMeterRegistryImpl();
163         messageSpy = conductor.getMessageIntelligenceAgency();
164         this.deviceManager = Preconditions.checkNotNull(manager);
165
166         packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
167                 /*initial*/ 1000, /*initial*/2000, messageSpy, REJECTED_DRAIN_FACTOR);
168
169         this.translatorLibrary = translatorLibrary;
170         portStatusTranslator = translatorLibrary.lookupTranslator(
171                 new TranslatorKey(deviceInfo.getVersion(), PortGrouping.class.getName()));
172         packetInTranslator = translatorLibrary.lookupTranslator(
173                 new TranslatorKey(deviceInfo.getVersion(), PacketIn.class.getName()));
174         flowRemovedTranslator = translatorLibrary.lookupTranslator(
175                 new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
176
177         itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
178         flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
179         itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
180         this.state = CONTEXT_STATE.INITIALIZATION;
181         this.convertorExecutor = convertorExecutor;
182     }
183
184     /**
185      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
186      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
187      */
188     @Override
189     public void initialSubmitTransaction() {
190         transactionChainManager.initialSubmitWriteTransaction();
191     }
192
193     @Override
194     public Long reserveXidForDeviceMessage() {
195         return outboundQueueProvider.reserveEntry();
196     }
197
198     @Override
199     public void addAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
200         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
201         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
202     }
203
204     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
205         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
206     }
207
208     @Override
209     public void removeAuxiliaryConnectionContext(final ConnectionContext connectionContext) {
210         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
211         LOG.debug("auxiliary connection dropped: {}, nodeId:{}", connectionContext.getConnectionAdapter()
212                 .getRemoteAddress(), getDeviceInfo().getNodeId());
213         auxiliaryConnectionContexts.remove(connectionDistinguisher);
214     }
215
216     @Override
217     public DeviceState getDeviceState() {
218         return deviceState;
219     }
220
221     @Override
222     public ReadOnlyTransaction getReadTransaction() {
223         return dataBroker.newReadOnlyTransaction();
224     }
225
226     @Override
227     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
228                                                           final InstanceIdentifier<T> path,
229                                                           final T data){
230         transactionChainManager.writeToTransaction(store, path, data, false);
231     }
232
233     @Override
234     public <T extends DataObject> void writeToTransactionWithParentsSlow(final LogicalDatastoreType store,
235                                                                          final InstanceIdentifier<T> path,
236                                                                          final T data){
237         transactionChainManager.writeToTransaction(store, path, data, true);
238     }
239
240     @Override
241     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) throws TransactionChainClosedException {
242         transactionChainManager.addDeleteOperationTotTxChain(store, path);
243     }
244
245     @Override
246     public boolean submitTransaction() {
247         return transactionChainManager.submitWriteTransaction();
248     }
249
250     @Override
251     public ConnectionContext getPrimaryConnectionContext() {
252         return primaryConnectionContext;
253     }
254
255     @Override
256     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
257         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
258     }
259
260     @Override
261     public DeviceFlowRegistry getDeviceFlowRegistry() {
262         return deviceFlowRegistry;
263     }
264
265     @Override
266     public DeviceGroupRegistry getDeviceGroupRegistry() {
267         return deviceGroupRegistry;
268     }
269
270     @Override
271     public DeviceMeterRegistry getDeviceMeterRegistry() {
272         return deviceMeterRegistry;
273     }
274
275     @Override
276     public void processReply(final OfHeader ofHeader) {
277         if (ofHeader instanceof Error) {
278             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
279         } else {
280             messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
281         }
282     }
283
284     @Override
285     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
286         for (final MultipartReply multipartReply : ofHeaderList) {
287             messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
288         }
289     }
290
291     @Override
292     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
293         //1. translate to general flow (table, priority, match, cookie)
294         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
295                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
296
297         if(!deviceManager.getIsNotificationFlowRemovedOff()) {
298             // Trigger off a notification
299             notificationPublishService.offerNotification(flowRemovedNotification);
300         } else if(LOG.isDebugEnabled()) {
301             LOG.debug("For nodeId={} isNotificationFlowRemovedOff={}", getDeviceInfo().getNodeId(), deviceManager.getIsNotificationFlowRemovedOff());
302         }
303
304         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
305         if (itemLifecycleListener != null) {
306             //2. create registry key
307             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification);
308             //3. lookup flowId
309             final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
310             //4. if flowId present:
311             if (flowDescriptor != null) {
312                 // a) construct flow path
313                 final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
314                         .augmentation(FlowCapableNode.class)
315                         .child(Table.class, flowDescriptor.getTableKey())
316                         .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
317                 // b) notify listener
318                 itemLifecycleListener.onRemoved(flowPath);
319             } else {
320                 LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
321                         getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
322             }
323         }
324     }
325
326     @Override
327     public void processPortStatusMessage(final PortStatusMessage portStatus) {
328         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
329         final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null);
330
331         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
332         try {
333             if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
334                 // because of ADD status node connector has to be created
335                 final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
336                 nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
337                 nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
338                 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
339             } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
340                 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
341             }
342             submitTransaction();
343         } catch (final Exception e) {
344             LOG.warn("Error processing port status message: ", e);
345         }
346     }
347
348     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
349         final InstanceIdentifier<Node> iiToNodes = getDeviceInfo().getNodeInstanceIdentifier();
350         final BigInteger dataPathId = getDeviceInfo().getDatapathId();
351         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
352         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
353     }
354
355     @Override
356     public void processPacketInMessage(final PacketInMessage packetInMessage) {
357         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
358         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
359         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
360
361         if (packetReceived == null) {
362             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
363             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
364             return;
365         } else {
366             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
367         }
368
369         if (!packetInLimiter.acquirePermit()) {
370             LOG.debug("Packet limited");
371             // TODO: save packet into emergency slot if possible
372             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
373             return;
374         }
375
376         final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
377         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
378             LOG.debug("notification offer rejected");
379             messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
380             packetInLimiter.drainLowWaterMark();
381             packetInLimiter.releasePermit();
382             return;
383         }
384
385         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
386             @Override
387             public void onSuccess(final Object result) {
388                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
389                 packetInLimiter.releasePermit();
390             }
391
392             @Override
393             public void onFailure(final Throwable t) {
394                 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
395                 LOG.debug("notification offer failed: {}", t.getMessage());
396                 LOG.trace("notification offer failed..", t);
397                 packetInLimiter.releasePermit();
398             }
399         });
400     }
401
402     @Override
403     public void processExperimenterMessage(final ExperimenterMessage notification) {
404         // lookup converter
405         final ExperimenterDataOfChoice vendorData = notification.getExperimenterDataOfChoice();
406         final MessageTypeKey<? extends ExperimenterDataOfChoice> key = new MessageTypeKey<>(
407                 getDeviceInfo().getVersion(),
408                 (Class<? extends ExperimenterDataOfChoice>) vendorData.getImplementedInterface());
409         final ConvertorMessageFromOFJava<ExperimenterDataOfChoice, MessagePath> messageConverter = extensionConverterProvider.getMessageConverter(key);
410         if (messageConverter == null) {
411             LOG.warn("custom converter for {}[OF:{}] not found",
412                     notification.getExperimenterDataOfChoice().getImplementedInterface(),
413                     getDeviceInfo().getVersion());
414             return;
415         }
416         // build notification
417         final ExperimenterMessageOfChoice messageOfChoice;
418         try {
419             messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
420             final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
421                 .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
422                     .setExperimenterMessageOfChoice(messageOfChoice);
423             // publish
424             notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
425         } catch (final ConversionException e) {
426             LOG.error("Conversion of experimenter notification failed", e);
427         }
428     }
429
430     @Override
431     public TranslatorLibrary oook() {
432         return translatorLibrary;
433     }
434
435     @Override
436     public synchronized void close() {
437         LOG.debug("closing deviceContext: {}, nodeId:{}",
438                 getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
439                 getDeviceInfo().getNodeId());
440         // NOOP
441         throw new UnsupportedOperationException("Autocloseble.close will be removed soon");
442     }
443
444     @Override
445     public void setCurrentBarrierTimeout(final Timeout timeout) {
446         barrierTaskTimeout = timeout;
447     }
448
449     @Override
450     public Timeout getBarrierTaskTimeout() {
451         return barrierTaskTimeout;
452     }
453
454     @Override
455     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
456         this.notificationPublishService = notificationPublishService;
457     }
458
459     @Override
460     public MessageSpy getMessageSpy() {
461         return messageSpy;
462     }
463
464     @Override
465     public void onPublished() {
466         Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
467         setState(CONTEXT_STATE.WORKING);
468         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
469         for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
470             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
471         }
472     }
473
474     @Override
475     public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
476         return new MultiMsgCollectorImpl(this, requestContext);
477     }
478
479     @Override
480     public void updatePacketInRateLimit(final long upperBound) {
481         packetInLimiter.changeWaterMarks((int) (LOW_WATERMARK_FACTOR * upperBound), (int) (HIGH_WATERMARK_FACTOR * upperBound));
482     }
483
484     @Override
485     public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
486         return itemLifeCycleSourceRegistry;
487     }
488
489     @Override
490     public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
491         this.extensionConverterProvider = extensionConverterProvider;
492     }
493
494     @Override
495     public ExtensionConverterProvider getExtensionConverterProvider() {
496         return extensionConverterProvider;
497     }
498
499     @Override
500     public synchronized void shutdownConnection() {
501         LOG.debug("Shutdown method for node {}", getDeviceInfo().getNodeId());
502         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
503             LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getNodeId());
504             return;
505         }
506         setState(CONTEXT_STATE.TERMINATION);
507
508         if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
509             LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getNodeId());
510             return;
511         }
512         /* Terminate Auxiliary Connection */
513         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
514             LOG.debug("Closing auxiliary connection {}", connectionContext.getNodeId());
515             connectionContext.closeConnection(false);
516         }
517         /* Terminate Primary Connection */
518         getPrimaryConnectionContext().closeConnection(true);
519         /* Close all Group Registry */
520         deviceGroupRegistry.close();
521         deviceFlowRegistry.close();
522         deviceMeterRegistry.close();
523     }
524
525     @Override
526     public ListenableFuture<Void> shuttingDownDataStoreTransactions() {
527         return transactionChainManager.shuttingDown();
528     }
529
530     @VisibleForTesting
531     TransactionChainManager getTransactionChainManager() {
532         return this.transactionChainManager;
533     }
534
535     @Override
536     public void setSwitchFeaturesMandatory(boolean switchFeaturesMandatory) {
537         this.switchFeaturesMandatory = switchFeaturesMandatory;
538     }
539
540     @Override
541     public CONTEXT_STATE getState() {
542         return this.state;
543     }
544
545     @Override
546     public void setState(CONTEXT_STATE state) {
547         this.state = state;
548     }
549
550     @Override
551     public void startupClusterServices() throws ExecutionException, InterruptedException {
552         LOG.debug("Initializing transaction chain manager for node {}", getDeviceInfo().getNodeId());
553         this.transactionChainManager.activateTransactionManager();
554         LOG.debug("Waiting to get node {} information", getDeviceInfo().getNodeId());
555         DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor).get();
556     }
557
558     @Override
559     public ListenableFuture<Void> stopClusterServices() {
560         return this.transactionChainManager.deactivateTransactionManager();
561     }
562
563     @Override
564     public ServiceGroupIdentifier getServiceIdentifier() {
565         return this.deviceInfo.getServiceIdentifier();
566     }
567
568     @Override
569     public DeviceInfo getDeviceInfo() {
570         return this.deviceInfo;
571     }
572 }