359e7493a5977dbacab805a49b2cfcb5772a6372
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.device;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import java.math.BigInteger;
19 import java.net.InetSocketAddress;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.TreeMap;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.annotation.Nonnull;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationRejectedException;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
36 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
43 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
44 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
45 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
46 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
47 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
49 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
50 import org.opendaylight.openflowplugin.api.openflow.device.listener.OpenflowMessageListenerFacade;
51 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
57 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
58 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
59 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
60 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
61 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
62 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
81 import org.opendaylight.yangtools.yang.binding.ChildOf;
82 import org.opendaylight.yangtools.yang.binding.DataObject;
83 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
84 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
85 import org.opendaylight.yangtools.yang.common.RpcError;
86 import org.opendaylight.yangtools.yang.common.RpcResult;
87 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 /**
92  *
93  */
94 public class DeviceContextImpl implements DeviceContext {
95
96     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
97     public static final String DEVICE_DISCONNECTED = "Device disconnected.";
98
99     private final ConnectionContext primaryConnectionContext;
100     private final DeviceState deviceState;
101     private final DataBroker dataBroker;
102     private final XidGenerator xidGenerator;
103     private final HashedWheelTimer hashedWheelTimer;
104     private final Map<Long, RequestContext> requests = new TreeMap<>();
105
106     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
107     private final TransactionChainManager txChainManager;
108     private TranslatorLibrary translatorLibrary;
109     private OpenflowMessageListenerFacade openflowMessageListenerFacade;
110     private final DeviceFlowRegistry deviceFlowRegistry;
111     private final DeviceGroupRegistry deviceGroupRegistry;
112     private final DeviceMeterRegistry deviceMeterRegistry;
113     private Timeout barrierTaskTimeout;
114     private NotificationService notificationService;
115     private final MessageSpy<Class> messageSpy;
116     private DeviceDisconnectedHandler deviceDisconnectedHandler;
117     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
118     private NotificationPublishService notificationPublishService;
119     private final ThrottledNotificationsOfferer throttledConnectionsHolder;
120     private BlockingQueue<PacketInMessage> bumperQueue;
121
122
123     @VisibleForTesting
124     DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
125                       @Nonnull final DeviceState deviceState,
126                       @Nonnull final DataBroker dataBroker,
127                       @Nonnull final HashedWheelTimer hashedWheelTimer,
128                       @Nonnull final MessageSpy _messageSpy,
129                       @Nonnull final ThrottledNotificationsOfferer throttledConnectionsHolder) {
130         this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
131         this.deviceState = Preconditions.checkNotNull(deviceState);
132         this.dataBroker = Preconditions.checkNotNull(dataBroker);
133         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
134         xidGenerator = new XidGenerator();
135         txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
136         auxiliaryConnectionContexts = new HashMap<>();
137         deviceFlowRegistry = new DeviceFlowRegistryImpl();
138         deviceGroupRegistry = new DeviceGroupRegistryImpl();
139         deviceMeterRegistry = new DeviceMeterRegistryImpl();
140         messageSpy = _messageSpy;
141         this.throttledConnectionsHolder = throttledConnectionsHolder;
142         bumperQueue = new ArrayBlockingQueue<PacketInMessage>(5000);
143     }
144
145     /**
146      * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
147      * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
148      */
149     void submitTransaction() {
150         txChainManager.enableSubmit();
151         txChainManager.submitTransaction();
152     }
153
154     @Override
155     public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
156         // TODO Auto-generated method stub
157
158     }
159
160     @Override
161     public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
162         final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
163         auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
164     }
165
166     private SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
167         return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
168     }
169
170     @Override
171     public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
172         // TODO Auto-generated method stub
173     }
174
175     @Override
176     public DeviceState getDeviceState() {
177         return deviceState;
178     }
179
180     @Override
181     public ReadTransaction getReadTransaction() {
182         return dataBroker.newReadOnlyTransaction();
183     }
184
185     @Override
186     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
187                                                           final InstanceIdentifier<T> path, final T data) {
188         txChainManager.writeToTransaction(store, path, data);
189     }
190
191     @Override
192     public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
193         txChainManager.addDeleteOperationTotTxChain(store, path);
194     }
195
196     @Override
197     public ConnectionContext getPrimaryConnectionContext() {
198         return primaryConnectionContext;
199     }
200
201     @Override
202     public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
203         return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
204     }
205
206     @Override
207     public Xid getNextXid() {
208         return xidGenerator.generate();
209     }
210
211     @Override
212     public RequestContext lookupRequest(final Xid xid) {
213         synchronized (requests) {
214             return requests.get(xid.getValue());
215         }
216     }
217
218     @Override
219     public int getNumberOfOutstandingRequests() {
220         synchronized (requests) {
221             return requests.size();
222         }
223     }
224
225     @Override
226     public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
227         synchronized (requests) {
228             requests.put(xid.getValue(), requestFutureContext);
229         }
230     }
231
232     @Override
233     public RequestContext unhookRequestCtx(final Xid xid) {
234         synchronized (requests) {
235             return requests.remove(xid.getValue());
236         }
237     }
238
239     @Override
240     public void attachOpenflowMessageListener(final OpenflowMessageListenerFacade openflowMessageListenerFacade) {
241         this.openflowMessageListenerFacade = openflowMessageListenerFacade;
242         primaryConnectionContext.getConnectionAdapter().setMessageListener(openflowMessageListenerFacade);
243     }
244
245     @Override
246     public OpenflowMessageListenerFacade getOpenflowMessageListenerFacade() {
247         return openflowMessageListenerFacade;
248     }
249
250     @Override
251     public DeviceFlowRegistry getDeviceFlowRegistry() {
252         return deviceFlowRegistry;
253     }
254
255     @Override
256     public DeviceGroupRegistry getDeviceGroupRegistry() {
257         return deviceGroupRegistry;
258     }
259
260     @Override
261     public DeviceMeterRegistry getDeviceMeterRegistry() {
262         return deviceMeterRegistry;
263     }
264
265     @Override
266     public void processReply(final OfHeader ofHeader) {
267         final RequestContext requestContext = requests.remove(ofHeader.getXid());
268         if (null != requestContext) {
269             final SettableFuture replyFuture = requestContext.getFuture();
270             RpcResult<OfHeader> rpcResult;
271             if (ofHeader instanceof Error) {
272                 //TODO : this is the point, where we can discover that add flow operation failed and where we should
273                 //TODO : remove this flow from deviceFlowRegistry
274                 final Error error = (Error) ofHeader;
275                 final String message = "Operation on device failed with xid " + ofHeader.getXid() + ".";
276                 rpcResult = RpcResultBuilder
277                         .<OfHeader>failed()
278                         .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
279                         .build();
280                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
281             } else {
282                 rpcResult = RpcResultBuilder
283                         .<OfHeader>success()
284                         .withResult(ofHeader)
285                         .build();
286                 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
287             }
288
289             replyFuture.set(rpcResult);
290             try {
291                 requestContext.close();
292             } catch (final Exception e) {
293                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
294                 LOG.debug("Closing RequestContext failed.. ", e);
295             }
296         } else {
297             LOG.warn("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
298                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
299         }
300     }
301
302     @Override
303     public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
304         final RequestContext requestContext;
305         synchronized (requests) {
306             requestContext = requests.remove(xid.getValue());
307         }
308         if (null != requestContext) {
309             final SettableFuture replyFuture = requestContext.getFuture();
310             final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
311                     .<List<MultipartReply>>success()
312                     .withResult(ofHeaderList)
313                     .build();
314             replyFuture.set(rpcResult);
315             for (final MultipartReply multipartReply : ofHeaderList) {
316                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
317             }
318
319             try {
320                 requestContext.close();
321             } catch (final Exception e) {
322                 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
323                 LOG.debug("Closing RequestContext failed.. ", e);
324             }
325         } else {
326             LOG.warn("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
327                     getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
328         }
329     }
330
331     @Override
332     public void processException(final Xid xid, final DeviceDataException deviceDataException) {
333
334         LOG.trace("Processing exception for xid : {}", xid.getValue());
335
336         final RequestContext requestContext = requests.remove(xid.getValue());
337
338         if (null != requestContext) {
339             final SettableFuture replyFuture = requestContext.getFuture();
340             final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
341                     .<List<OfHeader>>failed()
342                     .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
343                     .build();
344             replyFuture.set(rpcResult);
345             messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
346             try {
347                 requestContext.close();
348             } catch (final Exception e) {
349                 LOG.warn("Closing RequestContext failed: ", e);
350                 LOG.debug("Closing RequestContext failed..", e);
351             }
352         } else {
353             LOG.warn("Can't find request context registered for xid : {}. Exception message {}",
354                     xid.getValue(), deviceDataException.getMessage());
355         }
356     }
357
358     @Override
359     public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
360         //TODO: will be defined later
361     }
362
363     @Override
364     public void processPortStatusMessage(final PortStatusMessage portStatus) {
365         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
366         final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
367         final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
368         final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
369
370         final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
371         if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
372             // because of ADD status node connector has to be created
373             final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
374             nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
375             nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
376             writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
377         } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
378             addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
379         }
380     }
381
382     private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
383         final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
384         final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
385         final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
386         return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
387     }
388
389     @Override
390     public void processPacketInMessage(final PacketInMessage packetInMessage) {
391         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
392         final ConnectionAdapter connectionAdapter = this.getPrimaryConnectionContext().getConnectionAdapter();
393
394         final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
395         final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
396         final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
397
398         if (packetReceived != null) {
399             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
400         } else {
401             messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
402             return;
403         }
404
405         if (throttledConnectionsHolder.isThrottlingEffective(bumperQueue)) {
406             boolean caught = bumperQueue.offer(packetInMessage);
407             if (!caught) {
408                 LOG.debug("ingress notification dropped - no place in bumper queue [{}]", connectionAdapter.getRemoteAddress());
409             }
410         } else {
411             ListenableFuture<?> listenableFuture = notificationPublishService.offerNotification(packetReceived);
412             if (listenableFuture.isDone()) {
413                 try {
414                     listenableFuture.get();
415                 } catch (InterruptedException e) {
416                     LOG.debug("notification offer interrupted: {}", e.getMessage());
417                     LOG.trace("notification offer interrupted..", e);
418                 } catch (ExecutionException e) {
419                     if (e.getCause() instanceof NotificationRejectedException) {
420                         applyThrottling(packetInMessage, connectionAdapter);
421                     } else {
422                         LOG.debug("notification offer failed: {}", e.getMessage());
423                         LOG.trace("notification offer failed..", e);
424                     }
425                 }
426             } else {
427                 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
428             }
429         }
430     }
431
432     private void applyThrottling(PacketInMessage packetInMessage, final ConnectionAdapter connectionAdapter) {
433         final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
434         LOG.debug("Notification offer refused by notification service.");
435         messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
436         connectionAdapter.setAutoRead(false);
437
438         LOG.debug("Throttling ingress for {}", remoteAddress);
439         final ListenableFuture<Void> queueDone;
440
441         // adding first notification
442         bumperQueue.offer(packetInMessage);
443         synchronized (bumperQueue) {
444             queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue);
445         }
446         Futures.addCallback(queueDone, new FutureCallback<Void>() {
447             @Override
448             public void onSuccess(Void result) {
449                 LOG.debug("Un - throttling ingress for {}", remoteAddress);
450                 connectionAdapter.setAutoRead(true);
451             }
452
453             @Override
454             public void onFailure(Throwable t) {
455                 LOG.warn("failed to offer queued notification for {}: {}", remoteAddress, t.getMessage());
456                 LOG.debug("failed to offer queued notification for {}.. ", remoteAddress, t);
457             }
458         });
459     }
460
461     @Override
462     public TranslatorLibrary oook() {
463         return translatorLibrary;
464     }
465
466     @Override
467     public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
468         this.translatorLibrary = translatorLibrary;
469     }
470
471     @Override
472     public HashedWheelTimer getTimer() {
473         return hashedWheelTimer;
474     }
475
476     @Override
477     public void close() throws Exception {
478         deviceState.setValid(false);
479
480         LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
481         addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
482
483         deviceGroupRegistry.close();
484         deviceFlowRegistry.close();
485         deviceMeterRegistry.close();
486
487         if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
488             primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
489             primaryConnectionContext.getConnectionAdapter().disconnect();
490         }
491         for (final Map.Entry<Long, RequestContext> entry : requests.entrySet()) {
492             RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
493         }
494         for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
495             if (connectionContext.getConnectionAdapter().isAlive()) {
496                 connectionContext.getConnectionAdapter().disconnect();
497             }
498         }
499         for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
500             deviceContextClosedHandler.onDeviceContextClosed(this);
501         }
502
503     }
504
505     @Override
506     public void onDeviceDisconnected(final ConnectionContext connectionContext) {
507         if (this.getPrimaryConnectionContext().equals(connectionContext)) {
508             try {
509                 close();
510             } catch (final Exception e) {
511                 LOG.trace("Error closing device context.");
512             }
513             if (null != deviceDisconnectedHandler) {
514                 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
515             }
516         } else {
517             final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
518             auxiliaryConnectionContexts.remove(connectionDistinguisher);
519         }
520     }
521
522
523     private class XidGenerator {
524
525         private final AtomicLong xid = new AtomicLong(0);
526
527         public Xid generate() {
528             return new Xid(xid.incrementAndGet());
529         }
530     }
531
532     @Override
533     public RequestContext extractNextOutstandingMessage(final long barrierXid) {
534         RequestContext nextMessage = null;
535         synchronized (requests) {
536             final Iterator<Long> keyIterator = requests.keySet().iterator();
537             if (keyIterator.hasNext()) {
538                 final Long oldestXid = keyIterator.next();
539                 if (oldestXid < barrierXid) {
540                     nextMessage = requests.remove(oldestXid);
541                 }
542             }
543         }
544         return nextMessage;
545     }
546
547     @Override
548     public void setCurrentBarrierTimeout(final Timeout timeout) {
549         barrierTaskTimeout = timeout;
550     }
551
552     @Override
553     public Timeout getBarrierTaskTimeout() {
554         return barrierTaskTimeout;
555     }
556
557     @Override
558     public void setNotificationService(final NotificationService notificationServiceParam) {
559         notificationService = notificationServiceParam;
560     }
561
562     @Override
563     public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
564         this.notificationPublishService = notificationPublishService;
565     }
566
567     @Override
568     public MessageSpy getMessageSpy() {
569         return messageSpy;
570     }
571
572     @Override
573     public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
574         this.deviceDisconnectedHandler = deviceDisconnectedHandler;
575     }
576
577     @Override
578     public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
579         this.closeHandlers.add(deviceContextClosedHandler);
580     }
581
582     @Override
583     public void startGatheringOperationsToOneTransaction() {
584         txChainManager.startGatheringOperationsToOneTransaction();
585     }
586
587     @Override
588     public void commitOperationsGatheredInOneTransaction() {
589         txChainManager.commitOperationsGatheredInOneTransaction();
590     }
591
592 }