import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
private final DOMDataTreeChangeService domDataTreeChangeService;
private final ActorSystem actorSystem;
- private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
+ private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
new HashMap<>();
- private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
+ private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
private FlappingSingletonService flappingSingletonService;
private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
private IdIntsListener idIntsListener;
- private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
+ private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
private IdIntsDOMDataTreeLIstener idIntsDdtl;
@Override
public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
LOG.debug("write-transactions, input: {}", input);
-
- final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
-
- final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
- writeTransactionsHandler.start(settableFuture);
-
- return settableFuture;
+ return WriteTransactionsHandler.start(domDataBroker, input);
}
@Override
}
@Override
- public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+ public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
return null;
}
}
@Override
- public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
+ public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
return null;
}
}
@Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+ public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
return null;
}
if (flappingSingletonService != null) {
final RpcError error = RpcResultBuilder.newError(
- ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
+ ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
}
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
}
- final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
try {
- if (dtclReg != null) {
- dtclReg.close();
- dtclReg = null;
- }
+ idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
+ "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+ .withRpcError(error).build());
+ }
+ dtclReg.close();
+ dtclReg = null;
+ if (!idIntsListener.hasTriggered()) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+ "any notifications.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
+ .withRpcError(error).build());
+ }
+ final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
+ try {
final Optional<NormalizedNode<?, ?>> readResult =
rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
@Override
public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
LOG.debug("producer-transactions, input: {}", input);
+ return ProduceTransactionsHandler.start(domDataTreeService, input);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
+ LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
+
+ final String shardName = input.getShardName();
+ if (Strings.isNullOrEmpty(shardName)) {
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+ "A valid shard name must be specified");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ }
+
+ return shutdownShardGracefully(shardName);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
+ LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
+
+ final InstanceIdentifier<?> shardPrefix = input.getPrefix();
+
+ if (shardPrefix == null) {
+ final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
+ "A valid shard prefix must be specified");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
+ }
- final ProduceTransactionsHandler handler =
- new ProduceTransactionsHandler(domDataTreeService, input);
+ final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
+ final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
- final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
- handler.start(settableFuture);
+ return shutdownShardGracefully(cleanPrefixShardName);
+ }
- return settableFuture;
+ private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
+ final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
+ final ActorContext context = configDataStore.getActorContext();
+
+ long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+ final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
+ final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
+
+ context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
+ if (throwable != null) {
+ shutdownShardAsk.failure(throwable);
+ } else {
+ shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
+ }
+ }
+ }, context.getClientDispatcher());
+
+ shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
+ if (throwable != null) {
+ final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
+ .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
+ rpcResult.set(failedResult);
+ } else {
+ // according to Patterns.gracefulStop API, we don't have to
+ // check value of gracefulStopResult
+ rpcResult.set(RpcResultBuilder.<Void>success().build());
+ }
+ }
+ }, context.getClientDispatcher());
+ return rpcResult;
}
@Override
return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
}
+ try {
+ idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
+ "clustering-it", "clustering-it", e);
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
ddtlReg.close();
ddtlReg = null;
+ if (!idIntsDdtl.hasTriggered()) {
+ final RpcError error = RpcResultBuilder.newError(
+ ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
+ "any notifications.");
+ return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
+ .withRpcError(error).build());
+ }
+
final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
LOG.debug("Creating distributed datastore client for shard {}", shardName);