BUG-8494: Cleanup clustering-it-provider
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
index f0569fa07dc4e8842d92504168b117832b99acec..e0e8d99d1aab3eac8df847bf7075de6e15b0257e 100644 (file)
@@ -26,8 +26,10 @@ import java.io.StringWriter;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
@@ -85,6 +87,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
@@ -134,17 +137,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final DOMDataTreeChangeService domDataTreeChangeService;
     private final ActorSystem actorSystem;
 
-    private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
+    private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
             new HashMap<>();
 
-    private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+    private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
 
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
     private IdIntsListener idIntsListener;
-    private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+    private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
     private IdIntsDOMDataTreeLIstener idIntsDdtl;
 
@@ -249,13 +252,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     @Override
     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
         LOG.debug("write-transactions, input: {}", input);
-
-        final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
-        final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
-        writeTransactionsHandler.start(settableFuture);
-
-        return settableFuture;
+        return WriteTransactionsHandler.start(domDataBroker, input);
     }
 
     @Override
@@ -264,7 +261,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+    public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
         return null;
     }
 
@@ -336,7 +333,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+    public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
         return null;
     }
 
@@ -378,7 +375,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+    public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
         return null;
     }
 
@@ -462,6 +459,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
         }
 
+        try {
+            idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
+                    "clustering-it", "clustering-it", e);
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+                    .withRpcError(error).build());
+        }
+
         dtclReg.close();
         dtclReg = null;
 
@@ -560,14 +567,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     @Override
     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
         LOG.debug("producer-transactions, input: {}", input);
-
-        final ProduceTransactionsHandler handler =
-                new ProduceTransactionsHandler(domDataTreeService, input);
-
-        final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
-        handler.start(settableFuture);
-
-        return settableFuture;
+        return ProduceTransactionsHandler.start(domDataTreeService, input);
     }
 
     @Override
@@ -581,6 +581,28 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
         }
 
+        return shutdownShardGracefully(shardName);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
+        LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
+
+        final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+        if (shardPrefix == null) {
+            final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+                    "A valid shard prefix must be specified");
+            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+        }
+
+        final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+        final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
+
+        return shutdownShardGracefully(cleanPrefixShardName);
+    }
+
+    private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
         final ActorContext context = configDataStore.getActorContext();
 
@@ -614,7 +636,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
                 }
             }
         }, context.getClientDispatcher());
-
         return rpcResult;
     }
 
@@ -654,6 +675,16 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
         }
 
+        try {
+            idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            final RpcError error = RpcResultBuilder.newError(
+                    ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
+                    "clustering-it", "clustering-it", e);
+            return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+                    .withRpcError(error).build());
+        }
+
         ddtlReg.close();
         ddtlReg = null;