2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.device;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import java.math.BigInteger;
19 import java.net.InetSocketAddress;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.TreeMap;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ExecutionException;
30 import javax.annotation.Nonnull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
33 import org.opendaylight.controller.md.sal.binding.api.NotificationRejectedException;
34 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
35 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
36 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
37 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
38 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
39 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
40 import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
41 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
42 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
43 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
44 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
45 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
46 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
47 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
48 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceContextClosedHandler;
49 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
50 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
51 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
52 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
53 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
54 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
55 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
56 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
57 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
58 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
59 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
60 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
61 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
62 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
63 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
82 import org.opendaylight.yangtools.yang.binding.ChildOf;
83 import org.opendaylight.yangtools.yang.binding.DataObject;
84 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
85 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
86 import org.opendaylight.yangtools.yang.common.RpcError;
87 import org.opendaylight.yangtools.yang.common.RpcResult;
88 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
89 import org.slf4j.Logger;
90 import org.slf4j.LoggerFactory;
95 public class DeviceContextImpl implements DeviceContext {
97 private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
98 public static final String DEVICE_DISCONNECTED = "Device disconnected.";
100 private final ConnectionContext primaryConnectionContext;
101 private final DeviceState deviceState;
102 private final DataBroker dataBroker;
103 private final HashedWheelTimer hashedWheelTimer;
104 private final Map<Long, RequestContext> requests = new TreeMap<>();
106 private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
107 private final TransactionChainManager txChainManager;
108 private TranslatorLibrary translatorLibrary;
109 private final DeviceFlowRegistry deviceFlowRegistry;
110 private final DeviceGroupRegistry deviceGroupRegistry;
111 private final DeviceMeterRegistry deviceMeterRegistry;
112 private Timeout barrierTaskTimeout;
113 private NotificationService notificationService;
114 private final MessageSpy<Class<?>> messageSpy;
115 private DeviceDisconnectedHandler deviceDisconnectedHandler;
116 private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
117 private NotificationPublishService notificationPublishService;
118 private final ThrottledNotificationsOfferer throttledConnectionsHolder;
119 private final BlockingQueue<PacketReceived> bumperQueue;
120 private final OutboundQueue outboundQueueProvider;
123 public MultiMsgCollector getMultiMsgCollector() {
124 return multiMsgCollector;
128 public Long getReservedXid() {
129 return outboundQueueProvider.reserveEntry();
132 private final MultiMsgCollector multiMsgCollector = new MultiMsgCollectorImpl();
136 DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
137 @Nonnull final DeviceState deviceState,
138 @Nonnull final DataBroker dataBroker,
139 @Nonnull final HashedWheelTimer hashedWheelTimer,
140 @Nonnull final MessageSpy _messageSpy,
141 @Nonnull final ThrottledNotificationsOfferer throttledConnectionsHolder) {
142 this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
143 this.deviceState = Preconditions.checkNotNull(deviceState);
144 this.dataBroker = Preconditions.checkNotNull(dataBroker);
145 this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
146 txChainManager = new TransactionChainManager(dataBroker, hashedWheelTimer, 500L, 500L);
147 auxiliaryConnectionContexts = new HashMap<>();
148 deviceFlowRegistry = new DeviceFlowRegistryImpl();
149 deviceGroupRegistry = new DeviceGroupRegistryImpl();
150 deviceMeterRegistry = new DeviceMeterRegistryImpl();
151 messageSpy = _messageSpy;
152 this.throttledConnectionsHolder = throttledConnectionsHolder;
153 bumperQueue = new ArrayBlockingQueue<>(5000);
154 multiMsgCollector.setDeviceReplyProcessor(this);
155 outboundQueueProvider = Preconditions.checkNotNull(primaryConnectionContext.getOutboundQueueProvider());
159 * This method is called from {@link DeviceManagerImpl} only. So we could say "posthandshake process finish"
160 * and we are able to set a scheduler for an automatic transaction submitting by time (0,5sec).
162 void submitTransaction() {
163 txChainManager.enableSubmit();
164 txChainManager.submitTransaction();
168 public <M extends ChildOf<DataObject>> void onMessage(final M message, final RequestContext requestContext) {
169 // TODO Auto-generated method stub
174 public void addAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
175 final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
176 auxiliaryConnectionContexts.put(connectionDistinguisher, connectionContext);
179 private static SwitchConnectionDistinguisher createConnectionDistinguisher(final ConnectionContext connectionContext) {
180 return new SwitchConnectionCookieOFImpl(connectionContext.getFeatures().getAuxiliaryId());
184 public void removeAuxiliaryConenctionContext(final ConnectionContext connectionContext) {
185 // TODO Auto-generated method stub
189 public DeviceState getDeviceState() {
194 public ReadTransaction getReadTransaction() {
195 return dataBroker.newReadOnlyTransaction();
199 public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
200 final InstanceIdentifier<T> path, final T data) {
201 txChainManager.writeToTransaction(store, path, data);
205 public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
206 txChainManager.addDeleteOperationTotTxChain(store, path);
210 public ConnectionContext getPrimaryConnectionContext() {
211 return primaryConnectionContext;
215 public ConnectionContext getAuxiliaryConnectiobContexts(final BigInteger cookie) {
216 return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
220 public RequestContext lookupRequest(final Xid xid) {
221 synchronized (requests) {
222 return requests.get(xid.getValue());
227 public int getNumberOfOutstandingRequests() {
228 synchronized (requests) {
229 return requests.size();
234 public void hookRequestCtx(final Xid xid, final RequestContext requestFutureContext) {
235 synchronized (requests) {
236 requests.put(xid.getValue(), requestFutureContext);
241 public RequestContext unhookRequestCtx(final Xid xid) {
242 synchronized (requests) {
243 return requests.remove(xid.getValue());
248 public DeviceFlowRegistry getDeviceFlowRegistry() {
249 return deviceFlowRegistry;
253 public DeviceGroupRegistry getDeviceGroupRegistry() {
254 return deviceGroupRegistry;
258 public DeviceMeterRegistry getDeviceMeterRegistry() {
259 return deviceMeterRegistry;
263 public void processReply(final OfHeader ofHeader) {
264 final RequestContext requestContext = requests.remove(ofHeader.getXid());
265 if (null != requestContext) {
266 final SettableFuture replyFuture = requestContext.getFuture();
267 RpcResult<OfHeader> rpcResult;
268 if (ofHeader instanceof Error) {
269 //TODO : this is the point, where we can discover that add flow operation failed and where we should
270 //TODO : remove this flow from deviceFlowRegistry
271 final Error error = (Error) ofHeader;
272 final String message = "Operation on device failed with xid " + ofHeader.getXid() + ".";
273 rpcResult = RpcResultBuilder
275 .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
277 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
279 rpcResult = RpcResultBuilder
281 .withResult(ofHeader)
283 messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
286 replyFuture.set(rpcResult);
288 requestContext.close();
289 } catch (final Exception e) {
290 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
291 LOG.debug("Closing RequestContext failed.. ", e);
294 LOG.warn("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
295 getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
300 public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
301 final RequestContext requestContext;
302 synchronized (requests) {
303 requestContext = requests.remove(xid.getValue());
305 if (null != requestContext) {
306 final SettableFuture replyFuture = requestContext.getFuture();
307 final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
308 .<List<MultipartReply>>success()
309 .withResult(ofHeaderList)
311 replyFuture.set(rpcResult);
312 for (final MultipartReply multipartReply : ofHeaderList) {
313 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
316 unhookRequestCtx(xid);
318 requestContext.close();
319 } catch (final Exception e) {
320 LOG.warn("Closing RequestContext failed: {}", e.getMessage());
321 LOG.debug("Closing RequestContext failed.. ", e);
324 LOG.warn("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
325 getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
330 public void processException(final Xid xid, final DeviceDataException deviceDataException) {
332 LOG.trace("Processing exception for xid : {}", xid.getValue());
334 final RequestContext requestContext = requests.remove(xid.getValue());
336 if (null != requestContext) {
337 final SettableFuture replyFuture = requestContext.getFuture();
338 final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
339 .<List<OfHeader>>failed()
340 .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
342 replyFuture.set(rpcResult);
343 messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
345 requestContext.close();
346 } catch (final Exception e) {
347 LOG.warn("Closing RequestContext failed: ", e);
348 LOG.debug("Closing RequestContext failed..", e);
351 LOG.warn("Can't find request context registered for xid : {}. Exception message {}",
352 xid.getValue(), deviceDataException.getMessage());
357 public void processFlowRemovedMessage(final FlowRemoved flowRemoved) {
358 //TODO: will be defined later
362 public void processPortStatusMessage(final PortStatusMessage portStatus) {
363 messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
364 final TranslatorKey translatorKey = new TranslatorKey(portStatus.getVersion(), PortGrouping.class.getName());
365 final MessageTranslator<PortGrouping, FlowCapableNodeConnector> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
366 final FlowCapableNodeConnector flowCapableNodeConnector = messageTranslator.translate(portStatus, this, null);
368 final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
369 if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
370 // because of ADD status node connector has to be created
371 final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
372 nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
373 nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
374 writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
375 } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
376 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
380 private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
381 final InstanceIdentifier<Node> iiToNodes = deviceState.getNodeInstanceIdentifier();
382 final BigInteger dataPathId = deviceState.getFeatures().getDatapathId();
383 final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
384 return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
388 public void processPacketInMessage(final PacketInMessage packetInMessage) {
389 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
391 final TranslatorKey translatorKey = new TranslatorKey(packetInMessage.getVersion(), PacketIn.class.getName());
392 final MessageTranslator<PacketInMessage, PacketReceived> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
393 final PacketReceived packetReceived = messageTranslator.translate(packetInMessage, this, null);
395 if (packetReceived != null) {
396 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
398 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
402 ListenableFuture<?> listenableFuture = notificationPublishService.offerNotification(packetReceived);
403 if (NotificationPublishService.REJECTED.equals(listenableFuture)) {
404 LOG.debug("notification offer rejected");
405 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
406 } else if (listenableFuture.isDone()) {
409 x = listenableFuture.get();
410 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
411 } catch (InterruptedException e) {
412 LOG.debug("notification offer interrupted: {}", e.getMessage());
413 LOG.trace("notification offer interrupted..", e);
414 } catch (ExecutionException e) {
415 LOG.debug("notification offer failed: {}", e.getMessage());
416 LOG.trace("notification offer failed..", e);
419 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
423 messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
427 private void applyThrottling(final PacketReceived packetReceived, final ConnectionAdapter connectionAdapter) {
428 final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
429 LOG.debug("Notification offer refused by notification service.");
430 messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
431 connectionAdapter.setAutoRead(false);
433 LOG.debug("Throttling ingress for {}", remoteAddress);
434 final ListenableFuture<Void> queueDone;
436 // adding first notification
437 bumperQueue.offer(packetReceived);
438 synchronized (bumperQueue) {
439 queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue);
441 Futures.addCallback(queueDone, new FutureCallback<Void>() {
443 public void onSuccess(final Void result) {
444 LOG.debug("Un - throttling ingress for {}", remoteAddress);
445 connectionAdapter.setAutoRead(true);
449 public void onFailure(final Throwable t) {
450 LOG.warn("failed to offer queued notification for {}: {}", remoteAddress, t.getMessage());
451 LOG.debug("failed to offer queued notification for {}.. ", remoteAddress, t);
457 public TranslatorLibrary oook() {
458 return translatorLibrary;
462 public void setTranslatorLibrary(final TranslatorLibrary translatorLibrary) {
463 this.translatorLibrary = translatorLibrary;
467 public HashedWheelTimer getTimer() {
468 return hashedWheelTimer;
472 public void close() throws Exception {
473 deviceState.setValid(false);
475 LOG.trace("Removing node {} from operational DS.", getDeviceState().getNodeId());
476 addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, getDeviceState().getNodeInstanceIdentifier());
478 deviceGroupRegistry.close();
479 deviceFlowRegistry.close();
480 deviceMeterRegistry.close();
482 if (primaryConnectionContext.getConnectionAdapter().isAlive()) {
483 primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
484 primaryConnectionContext.getConnectionAdapter().disconnect();
486 for (final Map.Entry<Long, RequestContext> entry : requests.entrySet()) {
487 RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
489 for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
490 if (connectionContext.getConnectionAdapter().isAlive()) {
491 connectionContext.getConnectionAdapter().disconnect();
494 for (final DeviceContextClosedHandler deviceContextClosedHandler : closeHandlers) {
495 deviceContextClosedHandler.onDeviceContextClosed(this);
501 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
502 if (this.getPrimaryConnectionContext().equals(connectionContext)) {
505 } catch (final Exception e) {
506 LOG.trace("Error closing device context.");
508 if (null != deviceDisconnectedHandler) {
509 deviceDisconnectedHandler.onDeviceDisconnected(connectionContext);
512 final SwitchConnectionDistinguisher connectionDistinguisher = createConnectionDistinguisher(connectionContext);
513 auxiliaryConnectionContexts.remove(connectionDistinguisher);
518 public RequestContext extractNextOutstandingMessage(final long barrierXid) {
519 RequestContext nextMessage = null;
520 synchronized (requests) {
521 final Iterator<Long> keyIterator = requests.keySet().iterator();
522 if (keyIterator.hasNext()) {
523 final Long oldestXid = keyIterator.next();
524 if (oldestXid < barrierXid) {
525 nextMessage = requests.remove(oldestXid);
533 public void setCurrentBarrierTimeout(final Timeout timeout) {
534 barrierTaskTimeout = timeout;
538 public Timeout getBarrierTaskTimeout() {
539 return barrierTaskTimeout;
543 public void setNotificationService(final NotificationService notificationServiceParam) {
544 notificationService = notificationServiceParam;
548 public void setNotificationPublishService(final NotificationPublishService notificationPublishService) {
549 this.notificationPublishService = notificationPublishService;
553 public MessageSpy getMessageSpy() {
558 public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
559 this.deviceDisconnectedHandler = deviceDisconnectedHandler;
563 public void addDeviceContextClosedHandler(final DeviceContextClosedHandler deviceContextClosedHandler) {
564 this.closeHandlers.add(deviceContextClosedHandler);
568 public void startGatheringOperationsToOneTransaction() {
569 txChainManager.startGatheringOperationsToOneTransaction();
573 public void commitOperationsGatheredInOneTransaction() {
574 txChainManager.commitOperationsGatheredInOneTransaction();