import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberReachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.MemberUnreachableEvent;
import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorCommand;
+import org.opendaylight.controller.eos.akka.owner.supervisor.command.OwnerSupervisorReply;
import org.opendaylight.controller.eos.akka.registry.candidate.command.CandidateRegistryCommand;
import org.opendaylight.controller.eos.akka.registry.candidate.command.RegisterCandidate;
import org.opendaylight.controller.eos.akka.registry.candidate.command.UnregisterCandidate;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.RegisterListener;
import org.opendaylight.controller.eos.akka.registry.listener.type.command.TypeListenerRegistryCommand;
+import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
+import org.opendaylight.mdsal.binding.generator.impl.DefaultBindingRuntimeGenerator;
+import org.opendaylight.mdsal.binding.runtime.api.BindingRuntimeGenerator;
+import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
"akka://ClusterSystem@127.0.0.1:2552",
"akka://ClusterSystem@127.0.0.1:2553");
+ private static final BindingRuntimeGenerator BINDING_RUNTIME_GENERATOR = new DefaultBindingRuntimeGenerator();
+
+ protected static BindingCodecContext CODEC_CONTEXT
+ = new BindingCodecContext(BindingRuntimeHelpers.createRuntimeContext());
+
private static final String REMOTE_PROTOCOL = "akka";
private static final String PORT_PARAM = "akka.remote.artery.canonical.port";
private static final String ROLE_PARAM = "akka.cluster.roles";
private static final String SEED_NODES_PARAM = "akka.cluster.seed-nodes";
private static final String DATA_CENTER_PARAM = "akka.cluster.multi-data-center.self-data-center";
+ protected static MockNativeEntityOwnershipService startupNativeService(final int port, List<String> roles,
+ final List<String> seedNodes)
+ throws ExecutionException, InterruptedException {
+ final Map<String, Object> overrides = new HashMap<>();
+ overrides.put(PORT_PARAM, port);
+ overrides.put(ROLE_PARAM, roles);
+ if (!seedNodes.isEmpty()) {
+ overrides.put(SEED_NODES_PARAM, seedNodes);
+ }
+
+ final Config config = ConfigFactory.parseMap(overrides)
+ .withFallback(ConfigFactory.load());
+
+ // Create a classic Akka system since thats what we will have in osgi
+ final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
+
+ return new MockNativeEntityOwnershipService(system);
+ }
protected static ClusterNode startupRemote(final int port, final List<String> roles)
throws ExecutionException, InterruptedException {
// Create a classic Akka system since thats what we will have in osgi
final akka.actor.ActorSystem system = akka.actor.ActorSystem.create("ClusterSystem", config);
final ActorRef<BootstrapCommand> eosBootstrap =
- Adapter.spawn(system, EOSMain.create(), "EOSBootstrap");
+ Adapter.spawn(system, EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()), "EOSBootstrap");
final CompletionStage<RunningContext> ask = AskPattern.ask(eosBootstrap,
GetRunningContext::new,
}
private static Behavior<BootstrapCommand> rootBehavior() {
- return Behaviors.setup(context -> EOSMain.create());
+ return Behaviors.setup(context -> EOSMain.create(CODEC_CONTEXT.getInstanceIdentifierCodec()));
}
protected static void registerCandidates(final ClusterNode node, final DOMEntity entity, final String... members) {
});
}
- protected static void activateDatacenter(final ClusterNode clusterNode) {
- clusterNode.getOwnerSupervisor().tell(ActivateDataCenter.INSTANCE);
+ protected static CompletableFuture<OwnerSupervisorReply> activateDatacenter(final ClusterNode clusterNode) {
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(clusterNode.getOwnerSupervisor(),
+ ActivateDataCenter::new,
+ Duration.ofSeconds(20),
+ clusterNode.actorSystem.scheduler());
+ return ask.toCompletableFuture();
}
- protected static void deactivateDatacenter(final ClusterNode clusterNode) {
- clusterNode.getOwnerSupervisor().tell(DeactivateDataCenter.INSTANCE);
+ protected static CompletableFuture<OwnerSupervisorReply> deactivateDatacenter(final ClusterNode clusterNode) {
+ final CompletionStage<OwnerSupervisorReply> ask =
+ AskPattern.ask(clusterNode.getOwnerSupervisor(),
+ DeactivateDataCenter::new,
+ Duration.ofSeconds(20),
+ clusterNode.actorSystem.scheduler());
+ return ask.toCompletableFuture();
}
protected static void verifyListenerState(final MockEntityOwnershipListener listener, final DOMEntity entity,
}
protected static void verifyNoNotifications(final MockEntityOwnershipListener listener) {
- await().pollDelay(2, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+ verifyNoNotifications(listener, 2);
+ }
+
+ protected static void verifyNoNotifications(final MockEntityOwnershipListener listener, long delaySeconds) {
+ await().pollDelay(delaySeconds, TimeUnit.SECONDS).until(() -> listener.getChanges().isEmpty());
+ }
+
+ protected static void verifyNoAdditionalNotifications(
+ final MockEntityOwnershipListener listener, long delaySeconds) {
+ listener.resetListener();
+ verifyNoNotifications(listener, delaySeconds);
}
protected static final class ClusterNode {
public List<DOMEntityOwnershipChange> getChanges() {
return changes;
}
+
+ public void resetListener() {
+ changes.clear();
+ }
+ }
+
+ protected static final class MockNativeEntityOwnershipService extends AkkaEntityOwnershipService {
+ private ActorSystem classicActorSystem;
+
+ protected MockNativeEntityOwnershipService(ActorSystem classicActorSystem)
+ throws ExecutionException, InterruptedException {
+ super(classicActorSystem, CODEC_CONTEXT);
+ this.classicActorSystem = classicActorSystem;
+ }
+
+ protected void reachableMember(final String... role) {
+ AbstractNativeEosTest.reachableMember(ownerSupervisor, role);
+ }
+
+ public void unreachableMember(final String... role) {
+ AbstractNativeEosTest.unreachableMember(ownerSupervisor, role);
+ }
+
+ public ActorSystem getActorSystem() {
+ return classicActorSystem;
+ }
}
}