Do not keep per-switch state in DeviceManagerImpl
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
index 37c841230805c22742a5690bfb262adec1804786..c112b0e7393bcccd03267a2572a74ab530dbba26 100644 (file)
@@ -11,7 +11,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.HashedWheelTimer;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -40,7 +39,6 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
@@ -52,7 +50,7 @@ import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil
 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
 import org.opendaylight.openflowplugin.impl.connection.ThrottledNotificationsOffererImpl;
 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
-import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
+import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
@@ -76,7 +74,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
@@ -111,12 +108,11 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
 
     private final DataBroker dataBroker;
     private final HashedWheelTimer hashedWheelTimer;
-    private RequestContextStack emptyRequestContextStack;
     private TranslatorLibrary translatorLibrary;
     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
     private NotificationService notificationService;
     private NotificationPublishService notificationPublishService;
-    private ThrottledNotificationsOfferer<PacketInMessage> throttledNotificationsOfferer;
+    private ThrottledNotificationsOfferer throttledNotificationsOfferer;
 
     private final List<DeviceContext> deviceContexts = new ArrayList<DeviceContext>();
     private final MessageIntelligenceAgency messageIntelligenceAgency;
@@ -130,27 +126,13 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
-        tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), new NodesBuilder().build());
+
+        final NodesBuilder nodesBuilder = new NodesBuilder();
+        nodesBuilder.setNode(Collections.<Node>emptyList());
+        tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build());
         tx.submit();
 
         this.messageIntelligenceAgency = messageIntelligenceAgency;
-
-        emptyRequestContextStack = new RequestContextStack() {
-            @Override
-            public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
-                //NOOP
-            }
-
-            @Override
-            public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> data) {
-                return data.getFuture();
-            }
-
-            @Override
-            public <T> RequestContext<T> createRequestContext() {
-                return new RequestContextImpl<>(this);
-            }
-        };
     }
 
     @Override
@@ -192,7 +174,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture;
 
 
-
         if (OFConstants.OFP_VERSION_1_0 == version) {
             final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10();
 
@@ -242,7 +223,7 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         });
     }
 
-    private void chainTableTrunkWriteOF10(final DeviceContext deviceContext, final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture) {
+    private static void chainTableTrunkWriteOF10(final DeviceContext deviceContext, final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture) {
         Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
             @Override
             public void onSuccess(final List<RpcResult<List<MultipartReply>>> results) {
@@ -264,7 +245,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         });
     }
 
-
     private ListenableFuture<RpcResult<List<MultipartReply>>> processReplyDesc(final DeviceContext deviceContext,
                                                                                final DeviceState deviceState) {
 
@@ -322,24 +302,31 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         this.translatorLibrary = translatorLibrary;
     }
 
-    private ListenableFuture<RpcResult<List<MultipartReply>>> getNodeStaticInfo(final MultipartType type, final DeviceContext deviceContext,
+    private static ListenableFuture<RpcResult<List<MultipartReply>>> getNodeStaticInfo(final MultipartType type, final DeviceContext deviceContext,
                                                                                 final InstanceIdentifier<Node> nodeII, final short version) {
 
         final OutboundQueue queue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
-        Long reservedXid = queue.reserveEntry();
-        final Xid xid = new Xid(reservedXid);
 
-        final RequestContext<List<MultipartReply>> requestContext = emptyRequestContextStack.createRequestContext();
-        requestContext.setXid(xid);
+        final Long reserved = deviceContext.getReservedXid();
+        final RequestContext<List<MultipartReply>> requestContext = new AbstractRequestContext<List<MultipartReply>>(reserved) {
+            @Override
+            public void close() {
+                //NOOP
+            }
+        };
+
+        final Xid xid = requestContext.getXid();
 
-        LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
-        deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+        LOG.trace("Hooking xid {} to device context - precaution.", reserved);
 
         final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture = requestContext.getFuture();
 
         final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector();
-        multiMsgCollector.registerMultipartXid(xid.getValue());
-        queue.commitEntry(reservedXid, MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback<OfHeader>() {
+        multiMsgCollector.registerMultipartRequestContext(requestContext);
+
+        createSuccessProcessingCallback(type, deviceContext, nodeII, requestContextFuture);
+
+        queue.commitEntry(xid.getValue(), MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback<OfHeader>() {
             @Override
             public void onSuccess(final OfHeader ofHeader) {
                 if (ofHeader instanceof MultipartReply) {
@@ -361,6 +348,11 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
             }
         });
 
+
+        return requestContext.getFuture();
+    }
+
+    private static void createSuccessProcessingCallback(final MultipartType type, final DeviceContext deviceContext, final InstanceIdentifier<Node> nodeII, final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture) {
         Futures.addCallback(requestContextFuture, new FutureCallback<RpcResult<List<MultipartReply>>>() {
             @Override
             public void onSuccess(final RpcResult<List<MultipartReply>> rpcResult) {
@@ -388,16 +380,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
                 LOG.info("Request of type {} for static info of node {} failed.", type, nodeII);
             }
         });
-
-
-/*
-        final ListenableFuture<RpcResult<Void>> rpcFuture = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter()
-                .multipartRequest(MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type)));
-        final OFJResult2RequestCtxFuture OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture(requestContext, deviceContext);
-        OFJResult2RequestCtxFuture.processResultFromOfJava(rpcFuture);
-*/
-
-        return requestContext.getFuture();
     }
 
     // FIXME : remove after ovs tableFeatures fix
@@ -520,6 +502,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         spyPool = new ScheduledThreadPoolExecutor(1);
         spyPool.scheduleAtFixedRate(messageIntelligenceAgency, spyRate, spyRate, TimeUnit.SECONDS);
 
-        throttledNotificationsOfferer = new ThrottledNotificationsOffererImpl<>(notificationPublishService, messageIntelligenceAgency);
+        throttledNotificationsOfferer = new ThrottledNotificationsOffererImpl(notificationPublishService, messageIntelligenceAgency);
     }
 }