import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.ddata.LWWRegister;
import akka.cluster.ddata.LWWRegisterKey;
+import akka.cluster.ddata.ORMap;
+import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.typed.javadsl.DistributedData;
import akka.cluster.ddata.typed.javadsl.Replicator;
import com.typesafe.config.Config;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
+import org.opendaylight.controller.eos.akka.registry.candidate.CandidateRegistry;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
+import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
+import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
+import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
"akka://ClusterSystem@127.0.0.1:2552",
"akka://ClusterSystem@127.0.0.1:2553");
+ private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
+
+ protected static BindingCodecContext CODEC_CONTEXT
+ = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
+
private static final String REMOTE_PROTOCOL = "akka";
private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
private static final String ROLE_PARAM = "akka.cluster.roles";
private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
- protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
+ protected static MockNativeEntityOwnershipService startupNativeService(final int port, final List<String> roles,
final List<String> seedNodes)
throws ExecutionException, InterruptedException {
final Map<String, Object> overrides = new HashMap<>();
protected static ClusterNode startupWithDatacenter(final int port, final List<String> roles,
final List<String> seedNodes, final String dataCenter)
throws ExecutionException, InterruptedException {
+ final akka.actor.ActorSystem system = startupActorSystem(port, roles, seedNodes, dataCenter);
+ final ActorRef<BootstrapCommand> eosBootstrap =
+ Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
+
+ final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
+ GetRunningContext::new,
+ Duration.ofSeconds(5),
+ Adapter.toTyped(system.scheduler()));
+ final RunningContext runningContext = ask.toCompletableFuture().get();
+
+ return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
+ runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+ }
+
+ protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+ final List<String> seedNodes) {
final Map<String, Object> overrides = new HashMap<>();
overrides.put(PORT_PARAM, port);
overrides.put(ROLE_PARAM, roles);
if (!seedNodes.isEmpty()) {
overrides.put(SEED_NODES_PARAM, seedNodes);
}
- overrides.put(DATA_CENTER_PARAM, dataCenter);
final Config config = ConfigFactory.parseMap(overrides)
.withFallback(ConfigFactory.load());
// Create a classic Akka system since thats what we will have in osgi
- final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
- final ActorRef<BootstrapCommand> eosBootstrap =
- Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
+ return akka.actor.ActorSystem.create("ClusterSystem", config);
+ }
- final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
- GetRunningContext::new,
- Duration.ofSeconds(5),
- Adapter.toTyped(system.scheduler()));
- final RunningContext runningContext = ask.toCompletableFuture().get();
+ protected static akka.actor.ActorSystem startupActorSystem(final int port, final List<String> roles,
+ final List<String> seedNodes, final String dataCenter) {
+ final Map<String, Object> overrides = new HashMap<>();
+ overrides.put(PORT_PARAM, port);
+ overrides.put(ROLE_PARAM, roles);
+ if (!seedNodes.isEmpty()) {
+ overrides.put(SEED_NODES_PARAM, seedNodes);
+ }
+ overrides.put(DATA_CENTER_PARAM, dataCenter);
- return new ClusterNode(port, roles, system, eosBootstrap, runningContext.getListenerRegistry(),
- runningContext.getCandidateRegistry(), runningContext.getOwnerSupervisor());
+ final Config config = ConfigFactory.parseMap(overrides)
+ .withFallback(ConfigFactory.load());
+
+ // Create a classic Akka system since thats what we will have in osgi
+ return akka.actor.ActorSystem.create("ClusterSystem", config);
}
private static Behavior<BootstrapCommand> rootBehavior() {
- return Behaviors.setup(context -> EOSMain.create());
+ return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
}
protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
}
protected static void waitUntillOwnerPresent(final ClusterNode clusterNode, final DOMEntity entity) {
- await().until(() -> {
+ await().atMost(Duration.ofSeconds(15)).until(() -> {
final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
final CompletionStage<Replicator.GetResponse<LWWRegister<String>>> ask =
AskPattern.ask(distributedData.replicator(),
});
}
+ protected static void waitUntillCandidatePresent(final ClusterNode clusterNode, final DOMEntity entity,
+ final String candidate) {
+ await().atMost(Duration.ofSeconds(15)).until(() -> {
+ final DistributedData distributedData = DistributedData.get(clusterNode.getActorSystem());
+
+ final CompletionStage<Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>>> ask =
+ AskPattern.ask(distributedData.replicator(),
+ replyTo -> new Replicator.Get<>(
+ CandidateRegistry.KEY, Replicator.readLocal(), replyTo),
+ Duration.ofSeconds(5),
+ clusterNode.getActorSystem().scheduler());
+
+ final Replicator.GetResponse<ORMap<DOMEntity, ORSet<String>>> response =
+ ask.toCompletableFuture().get(5, TimeUnit.SECONDS);
+
+ if (response instanceof Replicator.GetSuccess) {
+ final Map<DOMEntity, ORSet<String>> entries =
+ ((Replicator.GetSuccess<ORMap<DOMEntity, ORSet<String>>>) response).dataValue().getEntries();
+
+ return entries.get(entity).contains(candidate);
+
+ }
+ return false;
+ });
+ }
+
protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
final CompletionStage<OwnerSupervisorReply> ask =
AskPattern.ask(clusterNode.getOwnerSupervisor(),
final boolean hasOwner, final boolean isOwner, final boolean wasOwner) {
await().until(() -> !listener.getChanges().isEmpty());
- await().untilAsserted(() -> {
+ await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
final List<DOMEntityOwnershipChange> changes = listener.getChanges();
final DOMEntityOwnershipChange domEntityOwnershipChange = listener.getChanges().get(changes.size() - 1);
assertEquals(entity, domEntityOwnershipChange.getEntity());
verifyNoNotifications(listener, 2);
}
- protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
+ protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, final long delaySeconds) {
await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
}
protected static void verifyNoAdditionalNotifications(
- final MockEntityOwnershipListener listener, long delaySeconds) {
+ final MockEntityOwnershipListener listener, final long delaySeconds) {
listener.resetListener();
verifyNoNotifications(listener, delaySeconds);
}
}
protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
- private ActorSystem classicActorSystem;
+ private final ActorSystem classicActorSystem;
- protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
+ protected MockNativeEntityOwnershipService(final ActorSystem classicActorSystem)
throws ExecutionException, InterruptedException {
- super(classicActorSystem);
+ super(classicActorSystem, CODEC_CONTEXT);
this.classicActorSystem = classicActorSystem;
}