import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
private final DOMDataTreeChangeService domDataTreeChangeService;
private final ActorSystem actorSystem;
- private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
+ private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
new HashMap<>();
- private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+ private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
private FlappingSingletonService flappingSingletonService;
private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
private IdIntsListener idIntsListener;
- private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+ private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
private IdIntsDOMDataTreeLIstener idIntsDdtl;
@Override
public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
LOG.debug("write-transactions, input: {}", input);
-
- final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
- final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
- writeTransactionsHandler.start(settableFuture);
-
- return settableFuture;
+ return WriteTransactionsHandler.start(domDataBroker, input);
}
@Override
}
@Override
- public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+ public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
return null;
}
}
@Override
- public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+ public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
return null;
}
}
@Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+ public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
return null;
}
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
}
+ try {
+ idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
+ "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+ .withRpcError(error).build());
+ }
+
dtclReg.close();
dtclReg = null;
@Override
public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
LOG.debug("producer-transactions, input: {}", input);
-
- final ProduceTransactionsHandler handler =
- new ProduceTransactionsHandler(domDataTreeService, input);
-
- final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
- handler.start(settableFuture);
-
- return settableFuture;
+ return ProduceTransactionsHandler.start(domDataTreeService, input);
}
@Override
return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
}
+ return shutdownShardGracefully(shardName);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
+ LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
+
+ final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+ if (shardPrefix == null) {
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+ "A valid shard prefix must be specified");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ }
+
+ final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+ final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
+
+ return shutdownShardGracefully(cleanPrefixShardName);
+ }
+
+ private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
final ActorContext context = configDataStore.getActorContext();
}
}
}, context.getClientDispatcher());
-
return rpcResult;
}
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
}
+ try {
+ idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
+ "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
ddtlReg.close();
ddtlReg = null;