BUG-4084: Li:Save flows in operational based on barrier success
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / rpc / RpcContextImpl.java
index 71d48288e37a7c499943119ff3d2152e5632ab36..502ac137241296d82d4b68fa8ad3db3464b0aece 100644 (file)
@@ -7,42 +7,37 @@
  */
 package org.opendaylight.openflowplugin.impl.rpc;
 
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
+import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.HashSet;
-import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.Semaphore;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RpcContextImpl implements RpcContext {
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(RpcContextImpl.class);
-    final RpcProviderRegistry rpcProviderRegistry;
+    private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class);
+    private final RpcProviderRegistry rpcProviderRegistry;
+    private final DeviceContext deviceContext;
+    private final MessageSpy messageSpy;
+    private final Semaphore tracker;
 
     // TODO: add private Sal salBroker
-    private final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
     private final Collection<RoutedRpcRegistration<?>> rpcRegistrations = new HashSet<>();
 
-    @GuardedBy("requestsList")
-    private final Collection<RequestContext<?>> requestsList = new HashSet<RequestContext<?>>();
-
-    private int maxRequestsPerDevice;
-
-    public RpcContextImpl(final RpcProviderRegistry rpcProviderRegistry, final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier) {
+    public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext, final int maxRequests) {
+        this.messageSpy = messageSpy;
         this.rpcProviderRegistry = rpcProviderRegistry;
-        this.nodeInstanceIdentifier = nodeInstanceIdentifier;
+        this.deviceContext = Preconditions.checkNotNull(deviceContext);
+        tracker = new Semaphore(maxRequests, true);
     }
 
     /**
@@ -53,33 +48,14 @@ public class RpcContextImpl implements RpcContext {
     public <S extends RpcService> void registerRpcServiceImplementation(final Class<S> serviceClass,
                                                                         final S serviceInstance) {
         final RoutedRpcRegistration<S> routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance);
-        routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier);
+        routedRpcReg.registerPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
         rpcRegistrations.add(routedRpcReg);
-        LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier);
-    }
+        LOG.debug("Registration of service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier());
 
-    @Override
-    public <T> SettableFuture<RpcResult<T>> storeOrFail(final RequestContext<T> requestContext) {
-        final SettableFuture<RpcResult<T>> rpcResultFuture = requestContext.getFuture();
-
-        final boolean success;
-        // FIXME: use a fixed-size collection, with lockless reserve/set queue
-        synchronized (requestsList) {
-            if (requestsList.size() < maxRequestsPerDevice) {
-                requestsList.add(requestContext);
-                success = true;
-            } else {
-                success = false;
-            }
+        if (serviceInstance instanceof ItemLifeCycleSource) {
+            // TODO: collect registration for selective unregistering in case of tearing down only one rpc
+            deviceContext.getItemLifeCycleSourceRegistry().registerLifeCycleSource((ItemLifeCycleSource) serviceInstance);
         }
-
-        if (!success) {
-            final RpcResult<T> rpcResult = RpcResultBuilder.<T>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "", "Device's request queue is full.").build();
-            rpcResultFuture.set(rpcResult);
-        }
-
-        return rpcResultFuture;
     }
 
     /**
@@ -88,43 +64,32 @@ public class RpcContextImpl implements RpcContext {
      * @see java.lang.AutoCloseable#close()
      */
     @Override
-    public void close() throws Exception {
+    public void close() {
         for (final RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations) {
-            rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
+            rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
             rpcRegistration.close();
         }
     }
 
-    /**
-     * @see org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext#setRequestContextQuota(int)
-     */
-    @Override
-    public void setRequestContextQuota(final int maxRequestsPerDevice) {
-        this.maxRequestsPerDevice = maxRequestsPerDevice;
-    }
-
     @Override
-    public <T> void forgetRequestContext(final RequestContext<T> requestContext) {
-        synchronized (requestsList) {
-            requestsList.remove(requestContext);
-            LOG.trace("Removed request context with xid {}. Context request in list {}.",
-                requestContext.getXid().getValue(), requestsList.size());
+    public <T> RequestContext<T> createRequestContext() {
+        if (!tracker.tryAcquire()) {
+            LOG.trace("Device queue {} at capacity", this);
+            return null;
         }
-    }
 
-    @Override
-    public <T> RequestContext<T> createRequestContext() {
-        return new RequestContextImpl<T>(this);
+        return new AbstractRequestContext<T>(deviceContext.getReservedXid()) {
+            @Override
+            public void close() {
+                tracker.release();
+                LOG.trace("Removed request context with xid {}", getXid().getValue());
+                messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED);
+            }
+        };
     }
 
     @Override
-    public void onDeviceDisconnected(final ConnectionContext connectionContext) {
-        for (RoutedRpcRegistration<?> registration : rpcRegistrations) {
-            registration.close();
-        }
-
-        synchronized (requestsList) {
-            requestsList.clear();
-        }
+    public void onDeviceContextClosed(DeviceContext deviceContext) {
+        close();
     }
 }