a49f8e3c4a9d6c3640e0902ac3f68fbedb595962
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import java.math.BigInteger;
19 import java.net.InetSocketAddress;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.TreeMap;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ExecutionException;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationRejectedException;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
35 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
43 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
44 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
45 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
46 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
47 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
49 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
50 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
51 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
57 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
58 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
59 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
60 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
61 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
62 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
63 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
82 import org.opendaylight.yangtools.yang.binding.ChildOf;
83 import org.opendaylight.yangtools.yang.binding.DataObject;
84 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
85 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
86 import org.opendaylight.yangtools.yang.common.RpcError;
87 import org.opendaylight.yangtools.yang.common.RpcResult;
88 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
89 import org.slf4j.Logger;
90 import org.slf4j.LoggerFactory;
91
92 /**
93  *
94  */
95 public class DeviceContextImpl implements DeviceContext {
96
97     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
98     public static final String DEVICE_DISCONNECTED = "Device disconnected.";
99
100     private final ConnectionContext primaryConnectionContext;
101     private final DeviceState deviceState;
102     private final DataBroker dataBroker;
103     private final HashedWheelTimer hashedWheelTimer;
104     private final Map<Long, RequestContext> requests = new TreeMap<>();
105
106     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
107     private final TransactionChainManager txChainManager;
108     private TranslatorLibrary translatorLibrary;
109     private final DeviceFlowRegistry deviceFlowRegistry;
110     private final DeviceGroupRegistry deviceGroupRegistry;
111     private final DeviceMeterRegistry deviceMeterRegistry;
112     private Timeout barrierTaskTimeout;
113     private NotificationService notificationService;
114     private final MessageSpy<Class<?>> messageSpy;
115     private DeviceDisconnectedHandler deviceDisconnectedHandler;
116     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
117     private NotificationPublishService notificationPublishService;
118     private final ThrottledNotificationsOfferer throttledConnectionsHolder;
119     private final BlockingQueue<PacketReceived> bumperQueue;
120     private final OutboundQueue outboundQueueProvider;
121
122     @Override
123     public MultiMsgCollector getMultiMsgCollector() {
124         return multiMsgCollector;
125     }
126
127     @Override
128     public Long getReservedXid() {
129         return outboundQueueProvider.reserveEntry();
130     }
131
132     private final MultiMsgCollector multiMsgCollector = new MultiMsgCollectorImpl();
133
134
135     @VisibleForTesting
136     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
137                       @Nonnull final DeviceState deviceState,
138                       @Nonnull final DataBroker dataBroker,
139                       @Nonnull final HashedWheelTimer hashedWheelTimer,
140                       @Nonnull final MessageSpy _messageSpy,
141                       @Nonnull final ThrottledNotificationsOfferer throttledConnectionsHolder) {
142         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
143         this.deviceState = Preconditions.checkNotNull(deviceState);
144         this.dataBroker = Preconditions.checkNotNull(dataBroker);
145         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
146         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
147         auxiliaryConnectionContexts = new HashMap<>();
148         deviceFlowRegistry = new DeviceFlowRegistryImpl();
149         deviceGroupRegistry = new DeviceGroupRegistryImpl();
150         deviceMeterRegistry = new DeviceMeterRegistryImpl();
151         messageSpy = _messageSpy;
152         this.throttledConnectionsHolder = throttledConnectionsHolder;
153         bumperQueue = new ArrayBlockingQueue<>(5000);
154         multiMsgCollector.setDeviceReplyProcessor(this);
155         outboundQueueProvider = Preconditions.checkNotNull(primaryConnectionContext.getOutboundQueueProvider());
156     }
157
158     /**
159      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
160      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
161      */
162     void submitTransaction() {
163         txChainManager.enableSubmit();
164         txChainManager.submitTransaction();
165     }
166
167     @Override
168     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
169         // TODO Auto-generated method stub
170
171     }
172
173     @Override
174     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
175         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
176         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
177     }
178
179     private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
180         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
181     }
182
183     @Override
184     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
185         // TODO Auto-generated method stub
186     }
187
188     @Override
189     public DeviceState getDeviceState() {
190         return deviceState;
191     }
192
193     @Override
194     public ReadTransaction getReadTransaction() {
195         return dataBroker.newReadOnlyTransaction();
196     }
197
198     @Override
199     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
200                                                           final InstanceIdentifier<T> path, final T data) {
201         txChainManager.writeToTransaction(store, path, data);
202     }
203
204     @Override
205     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
206         txChainManager.addDeleteOperationTotTxChain(store, path);
207     }
208
209     @Override
210     public ConnectionContext getPrimaryConnectionContext() {
211         return primaryConnectionContext;
212     }
213
214     @Override
215     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
216         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
217     }
218
219     @Override
220     public RequestContext lookupRequest(final Xid xid) {
221         synchronized (requests) {
222             return requests.get(xid.getValue());
223         }
224     }
225
226     @Override
227     public int getNumberOfOutstandingRequests() {
228         synchronized (requests) {
229             return requests.size();
230         }
231     }
232
233     @Override
234     public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
235         synchronized (requests) {
236             requests.put(xid.getValue(), requestFutureContext);
237         }
238     }
239
240     @Override
241     public RequestContext unhookRequestCtx(final Xid xid) {
242         synchronized (requests) {
243             return requests.remove(xid.getValue());
244         }
245     }
246
247     @Override
248     public DeviceFlowRegistry getDeviceFlowRegistry() {
249         return deviceFlowRegistry;
250     }
251
252     @Override
253     public DeviceGroupRegistry getDeviceGroupRegistry() {
254         return deviceGroupRegistry;
255     }
256
257     @Override
258     public DeviceMeterRegistry getDeviceMeterRegistry() {
259         return deviceMeterRegistry;
260     }
261
262     @Override
263     public void processReply(final OfHeader ofHeader) {
264         final RequestContext requestContext = requests.remove(ofHeader.getXid());
265         if (null != requestContext) {
266             final SettableFuture replyFuture = requestContext.getFuture();
267             RpcResult<OfHeader> rpcResult;
268             if (ofHeader instanceof Error) {
269                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
270                 //TODO : remove this flow from deviceFlowRegistry
271                 final Error error = (Error) ofHeader;
272                 final String message = "Operation on device failed with xid " + ofHeader.getXid() + ".";
273                 rpcResult = RpcResultBuilder
274                         .<OfHeader>failed()
275                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
276                         .build();
277                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
278             } else {
279                 rpcResult = RpcResultBuilder
280                         .<OfHeader>success()
281                         .withResult(ofHeader)
282                         .build();
283                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
284             }
285
286             replyFuture.set(rpcResult);
287             try {
288                 requestContext.close();
289             } catch (final Exception e) {
290                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
291                 LOG.debug("Closing RequestContext failed.. ", e);
292             }
293         } else {
294             LOG.warn("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
295                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
296         }
297     }
298
299     @Override
300     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
301         final RequestContext requestContext;
302         synchronized (requests) {
303             requestContext = requests.remove(xid.getValue());
304         }
305         if (null != requestContext) {
306             final SettableFuture replyFuture = requestContext.getFuture();
307             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
308                     .<List<MultipartReply>>success()
309                     .withResult(ofHeaderList)
310                     .build();
311             replyFuture.set(rpcResult);
312             for (final MultipartReply multipartReply : ofHeaderList) {
313                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
314             }
315
316             unhookRequestCtx(xid);
317             try {
318                 requestContext.close();
319             } catch (final Exception e) {
320                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
321                 LOG.debug("Closing RequestContext failed.. ", e);
322             }
323         } else {
324             LOG.warn("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
325                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
326         }
327     }
328
329     @Override
330     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
331
332         LOG.trace("Processing exception for xid : {}", xid.getValue());
333
334         final RequestContext requestContext = requests.remove(xid.getValue());
335
336         if (null != requestContext) {
337             final SettableFuture replyFuture = requestContext.getFuture();
338             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
339                     .<List<OfHeader>>failed()
340                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
341                     .build();
342             replyFuture.set(rpcResult);
343             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
344             try {
345                 requestContext.close();
346             } catch (final Exception e) {
347                 LOG.warn("Closing RequestContext failed: ", e);
348                 LOG.debug("Closing RequestContext failed..", e);
349             }
350         } else {
351             LOG.warn("Can't find request context registered for xid : {}. Exception message {}",
352                     xid.getValue(), deviceDataException.getMessage());
353         }
354     }
355
356     @Override
357     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
358         //TODO: will be defined later
359     }
360
361     @Override
362     public void processPortStatusMessage(final PortStatusMessage portStatus) {
363         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
364         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
365         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
366         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
367
368         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
369         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
370             // because of ADD status node connector has to be created
371             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
372             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
373             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
374             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
375         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
376             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
377         }
378     }
379
380     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
381         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
382         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
383         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
384         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
385     }
386
387     @Override
388     public void processPacketInMessage(final PacketInMessage packetInMessage) {
389         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
390
391         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
392         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
393         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
394
395         if (packetReceived != null) {
396             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
397         } else {
398             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
399             return;
400         }
401
402         ListenableFuture<?> listenableFuture = notificationPublishService.offerNotification(packetReceived);
403         if (NotificationPublishService.REJECTED.equals(listenableFuture)) {
404             LOG.debug("notification offer rejected");
405             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
406         } else if (listenableFuture.isDone()) {
407             Object x = null;
408             try {
409                 x = listenableFuture.get();
410                 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
411             } catch (InterruptedException e) {
412                 LOG.debug("notification offer interrupted: {}", e.getMessage());
413                 LOG.trace("notification offer interrupted..", e);
414             } catch (ExecutionException e) {
415                 LOG.debug("notification offer failed: {}", e.getMessage());
416                 LOG.trace("notification offer failed..", e);
417             } finally {
418                 if (null == x) {
419                     messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
420                 }
421             }
422         } else {
423             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
424         }
425     }
426
427     private void applyThrottling(final PacketReceived packetReceived, final ConnectionAdapter connectionAdapter) {
428         final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
429         LOG.debug("Notification offer refused by notification service.");
430         messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
431         connectionAdapter.setAutoRead(false);
432
433         LOG.debug("Throttling ingress for {}", remoteAddress);
434         final ListenableFuture<Void> queueDone;
435
436         // adding first notification
437         bumperQueue.offer(packetReceived);
438         synchronized (bumperQueue) {
439             queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue);
440         }
441         Futures.addCallback(queueDone, new FutureCallback<Void>() {
442             @Override
443             public void onSuccess(final Void result) {
444                 LOG.debug("Un - throttling ingress for {}", remoteAddress);
445                 connectionAdapter.setAutoRead(true);
446             }
447
448             @Override
449             public void onFailure(final Throwable t) {
450                 LOG.warn("failed to offer queued notification for {}: {}", remoteAddress, t.getMessage());
451                 LOG.debug("failed to offer queued notification for {}.. ", remoteAddress, t);
452             }
453         });
454     }
455
456     @Override
457     public TranslatorLibrary oook() {
458         return translatorLibrary;
459     }
460
461     @Override
462     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
463         this.translatorLibrary = translatorLibrary;
464     }
465
466     @Override
467     public HashedWheelTimer getTimer() {
468         return hashedWheelTimer;
469     }
470
471     @Override
472     public void close() throws Exception {
473         deviceState.setValid(false);
474
475         LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
476         addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
477
478         deviceGroupRegistry.close();
479         deviceFlowRegistry.close();
480         deviceMeterRegistry.close();
481
482         if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
483             primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
484             primaryConnectionContext.getConnectionAdapter().disconnect();
485         }
486         for (final Map.Entry<Long, RequestContext> entry : requests.entrySet()) {
487             RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
488         }
489         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
490             if (connectionContext.getConnectionAdapter().isAlive()) {
491                 connectionContext.getConnectionAdapter().disconnect();
492             }
493         }
494         for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
495             deviceContextClosedHandler.onDeviceContextClosed(this);
496         }
497
498     }
499
500     @Override
501     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
502         if (this.getPrimaryConnectionContext().equals(connectionContext)) {
503             try {
504                 close();
505             } catch (final Exception e) {
506                 LOG.trace("Error closing device context.");
507             }
508             if (null != deviceDisconnectedHandler) {
509                 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
510             }
511         } else {
512             final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
513             auxiliaryConnectionContexts.remove(connectionDistinguisher);
514         }
515     }
516
517     @Override
518     public RequestContext extractNextOutstandingMessage(final long barrierXid) {
519         RequestContext nextMessage = null;
520         synchronized (requests) {
521             final Iterator<Long> keyIterator = requests.keySet().iterator();
522             if (keyIterator.hasNext()) {
523                 final Long oldestXid = keyIterator.next();
524                 if (oldestXid < barrierXid) {
525                     nextMessage = requests.remove(oldestXid);
526                 }
527             }
528         }
529         return nextMessage;
530     }
531
532     @Override
533     public void setCurrentBarrierTimeout(final Timeout timeout) {
534         barrierTaskTimeout = timeout;
535     }
536
537     @Override
538     public Timeout getBarrierTaskTimeout() {
539         return barrierTaskTimeout;
540     }
541
542     @Override
543     public void setNotificationService(final NotificationService notificationServiceParam) {
544         notificationService = notificationServiceParam;
545     }
546
547     @Override
548     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
549         this.notificationPublishService = notificationPublishService;
550     }
551
552     @Override
553     public MessageSpy getMessageSpy() {
554         return messageSpy;
555     }
556
557     @Override
558     public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
559         this.deviceDisconnectedHandler = deviceDisconnectedHandler;
560     }
561
562     @Override
563     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
564         this.closeHandlers.add(deviceContextClosedHandler);
565     }
566
567     @Override
568     public void startGatheringOperationsToOneTransaction() {
569         txChainManager.startGatheringOperationsToOneTransaction();
570     }
571
572     @Override
573     public void commitOperationsGatheredInOneTransaction() {
574         txChainManager.commitOperationsGatheredInOneTransaction();
575     }
576
577 }