From d69af1f79ae4630be8c4d65b98096aa27b1665b6 Mon Sep 17 00:00:00 2001 From: Jakub Morvay Date: Mon, 6 Mar 2017 13:54:31 +0100 Subject: [PATCH] Bug 7407 - CDS: allow applications to request Leader movement This patch provides the routing from cds-dom-api CDSShardAccess to the backend RaftActor. Change-Id: I9fa315034d95a1896393a6152147a7bc50829b2a Signed-off-by: Jakub Morvay Signed-off-by: Robert Varga --- .../LeadershipTransferFailedException.java | 4 + .../cluster/sharding/CDSShardAccessImpl.java | 218 ++++++++++++++++++ .../DistributedShardedDOMDataTree.java | 37 ++- .../sharding/RoleChangeListenerActor.java | 71 ++++++ .../sharding/CDSShardAccessImplTest.java | 217 +++++++++++++++++ .../sharding/RoleChangeListenerActorTest.java | 62 +++++ 6 files changed, 606 insertions(+), 3 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java index ce4cfcd908..9d971d5036 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/LeadershipTransferFailedException.java @@ -14,4 +14,8 @@ public class LeadershipTransferFailedException extends Exception { public LeadershipTransferFailedException(final String message) { super(message); } + + public LeadershipTransferFailedException(final String message, final Throwable cause) { + super(message, cause); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java new file mode 100644 index 0000000000..bc90716af7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImpl.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.sharding; + +import static akka.actor.ActorRef.noSender; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; +import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; +import org.opendaylight.controller.cluster.dom.api.LeaderLocation; +import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; +import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration; +import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.compat.java8.FutureConverters; +import scala.concurrent.Future; + + +/** + * Default {@link CDSShardAccess} implementation. Listens on leader location + * change events and distributes them to registered listeners. Also updates + * current information about leader location accordingly. + * + *

+ * Sends {@link MakeLeaderLocal} message to local shards and translates its result + * on behalf users {@link #makeLeaderLocal()} calls. + * + *

+ * {@link org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer} that + * creates instances of this class has to call {@link #close()} once it is no + * longer valid. + */ +final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CDSShardAccessImpl.class); + + private final Collection listeners = ConcurrentHashMap.newKeySet(); + private final DOMDataTreeIdentifier prefix; + private final ActorContext actorContext; + private final Timeout makeLeaderLocalTimeout; + + private ActorRef roleChangeListenerActor; + + private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN; + private volatile boolean closed = false; + + CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorContext actorContext) { + this.prefix = Preconditions.checkNotNull(prefix); + this.actorContext = Preconditions.checkNotNull(actorContext); + this.makeLeaderLocalTimeout = + new Timeout(actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2)); + + // register RoleChangeListenerActor + // TODO Maybe we should do this in async + final Optional localShardReply = + actorContext.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); + Preconditions.checkState(localShardReply.isPresent(), + "Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix); + roleChangeListenerActor = + actorContext.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this)); + } + + private void checkNotClosed() { + Preconditions.checkState(!closed, + "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid"); + } + + @Override + @Nonnull + public DOMDataTreeIdentifier getShardIdentifier() { + checkNotClosed(); + return prefix; + } + + @Override + @Nonnull + public LeaderLocation getLeaderLocation() { + checkNotClosed(); + // TODO before getting first notification from roleChangeListenerActor + // we will always return UNKNOWN + return currentLeader; + } + + @Override + @Nonnull + public CompletionStage makeLeaderLocal() { + // TODO when we have running make leader local operation + // we should just return the same completion stage + checkNotClosed(); + + // TODO can we cache local shard actorRef? + final Future localShardReply = + actorContext.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); + + // we have to tell local shard to make leader local + final scala.concurrent.Promise makeLeaderLocalAsk = Futures.promise(); + localShardReply.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable { + if (failure instanceof LocalShardNotFoundException) { + LOG.debug("No local shard found for {} - Cannot request leadership transfer to local shard.", + getShardIdentifier(), failure); + makeLeaderLocalAsk.failure(failure); + } else if (failure != null) { + // TODO should this be WARN? + LOG.debug("Failed to find local shard for {} - Cannot request leadership transfer to local shard.", + getShardIdentifier(), failure); + makeLeaderLocalAsk.failure(failure); + } else { + makeLeaderLocalAsk + .completeWith(actorContext + .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout)); + } + } + }, actorContext.getClientDispatcher()); + + // we have to transform make leader local request result + Future makeLeaderLocalFuture = makeLeaderLocalAsk.future() + .transform(new Mapper() { + @Override + public Void apply(final Object parameter) { + return null; + } + }, new Mapper() { + @Override + public Throwable apply(final Throwable parameter) { + if (parameter instanceof LeadershipTransferFailedException) { + // do nothing with exception and just pass it as it is + return parameter; + } + // wrap exception in LeadershipTransferFailedEx + return new LeadershipTransferFailedException("Leadership transfer failed", parameter); + } + }, actorContext.getClientDispatcher()); + + return FutureConverters.toJava(makeLeaderLocalFuture); + } + + @Override + @Nonnull + public LeaderLocationListenerRegistration + registerLeaderLocationListener(@Nonnull final L listener) { + checkNotClosed(); + Preconditions.checkNotNull(listener); + Preconditions.checkArgument(!listeners.contains(listener), + "Listener {} is already registered with ShardAccess {}", listener, this); + + LOG.debug("Registering LeaderLocationListener {}", listener); + + listeners.add(listener); + + return new LeaderLocationListenerRegistration() { + @Override + public L getInstance() { + return listener; + } + + @Override + public void close() { + listeners.remove(listener); + } + }; + } + + @Override + @SuppressWarnings("checkstyle:IllegalCatch") + public void onLeaderLocationChanged(@Nonnull final LeaderLocation location) { + if (closed) { + // we are closed already. Do not dispatch any new leader location + // change events. + return; + } + + LOG.debug("Received leader location change notification. New leader location: {}", location); + currentLeader = location; + listeners.forEach(listener -> { + try { + listener.onLeaderLocationChanged(location); + } catch (Exception e) { + LOG.warn("Ignoring uncaught exception thrown be LeaderLocationListener {} " + + "during processing leader location change {}", listener, location, e); + } + }); + } + + @Override + public void close() { + // TODO should we also remove all listeners? + LOG.debug("Closing {} ShardAccess", prefix); + closed = true; + + if (roleChangeListenerActor != null) { + // stop RoleChangeListenerActor + roleChangeListenerActor.tell(PoisonPill.getInstance(), noSender()); + roleChangeListenerActor = null; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java index 9726a986c1..df49e33cdd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java @@ -32,7 +32,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -54,6 +56,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; +import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer; +import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator; import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard; @@ -611,13 +615,13 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat final Future closeFuture = ask.transform( new Mapper() { @Override - public Void apply(Object parameter) { + public Void apply(final Object parameter) { return null; } }, new Mapper() { @Override - public Throwable apply(Throwable throwable) { + public Throwable apply(final Throwable throwable) { return throwable; } }, actorSystem.dispatcher()); @@ -626,12 +630,16 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } } - private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer { + // TODO what about producers created by this producer? + // They should also be CDSProducers + private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer { private final DOMDataTreeProducer delegate; private final Collection subtrees; private final ActorRef shardDataTreeActor; private final ActorContext actorContext; + @GuardedBy("shardAccessMap") + private final Map shardAccessMap = new HashMap<>(); ProxyProducer(final DOMDataTreeProducer delegate, final Collection subtrees, @@ -658,8 +666,12 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat } @Override + @SuppressWarnings("checkstyle:IllegalCatch") public void close() throws DOMDataTreeProducerException { delegate.close(); + synchronized (shardAccessMap) { + shardAccessMap.values().forEach(CDSShardAccessImpl::close); + } final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees)); if (o instanceof DOMDataTreeProducerException) { @@ -673,5 +685,24 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat protected DOMDataTreeProducer delegate() { return delegate; } + + @Nonnull + @Override + public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) { + synchronized (shardAccessMap) { + Preconditions.checkArgument(subtrees.contains(subtree), + "Subtree {} is not controlled by this producer {}", subtree, this); + if (shardAccessMap.get(subtree) != null) { + return shardAccessMap.get(subtree); + } + + // TODO Maybe we can have static factory method and return the same instance + // for same subtrees. But maybe it is not needed since there can be only one + // producer attached to some subtree at a time. And also how we can close ShardAccess + // then + final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(subtree, actorContext); + return shardAccessMap.put(subtree, shardAccess); + } + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java new file mode 100644 index 0000000000..38b69e720a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActor.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.dom.api.LeaderLocation; +import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; + +/** + * Proxy actor which acts as a facade for user-provided + * {@link LeaderLocationListener}. It subscribes for {@link LeaderStateChanged} + * notifications in its pre start hook and translates them to + * {@link LeaderLocationListener#onLeaderLocationChanged(LeaderLocation)} + * events. + */ +public class RoleChangeListenerActor extends AbstractUntypedActor { + private final LeaderLocationListener leaderLocationListener; + private final ActorRef roleChangeNotifier; + + private RoleChangeListenerActor(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) { + this.roleChangeNotifier = Preconditions.checkNotNull(roleChangeNotifier); + this.leaderLocationListener = Preconditions.checkNotNull(listener); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + roleChangeNotifier.tell(new RegisterRoleChangeListener(), getSelf()); + } + + @Override + protected void handleReceive(final Object message) throws Exception { + if (message instanceof RoleChangeNotification) { + ignoreMessage(message); + } else if (message instanceof LeaderStateChanged) { + onLeaderStateChanged((LeaderStateChanged) message); + } else { + unknownMessage(message); + } + } + + private void onLeaderStateChanged(final LeaderStateChanged message) { + final LeaderLocation newLocation; + if (message.getLeaderId() == null) { + newLocation = LeaderLocation.UNKNOWN; + } else if (message.getMemberId().equals(message.getLeaderId())) { + newLocation = LeaderLocation.LOCAL; + } else { + newLocation = LeaderLocation.REMOTE; + } + + // TODO should we wrap this in try catch block? + leaderLocationListener.onLeaderLocationChanged(newLocation); + } + + public static Props props(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) { + return Props.create(RoleChangeListenerActor.class, roleChangeNotifier, listener); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java new file mode 100644 index 0000000000..cbb07264ae --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/CDSShardAccessImplTest.java @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.sharding; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import com.google.common.base.Optional; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.dom.api.LeaderLocation; +import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; +import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration; +import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +public class CDSShardAccessImplTest extends AbstractActorTest { + + private static final DOMDataTreeIdentifier TEST_ID = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH); + + private CDSShardAccessImpl shardAccess; + private ActorContext context; + + @Before + public void setUp() { + context = mock(ActorContext.class); + final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build(); + doReturn(Optional.of(getSystem().deadLetters())).when(context).findLocalShard(any()); + doReturn(datastoreContext).when(context).getDatastoreContext(); + doReturn(getSystem()).when(context).getActorSystem(); + shardAccess = new CDSShardAccessImpl(TEST_ID, context); + } + + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testRegisterLeaderLocationListener() { + final LeaderLocationListener listener1 = mock(LeaderLocationListener.class); + + // first registration should be OK + shardAccess.registerLeaderLocationListener(listener1); + + // second registration should fail with IllegalArgumentEx + try { + shardAccess.registerLeaderLocationListener(listener1); + fail("Should throw exception"); + } catch (final Exception e) { + assertTrue(e instanceof IllegalArgumentException); + } + + // null listener registration should fail with NPE + try { + shardAccess.registerLeaderLocationListener(null); + fail("Should throw exception"); + } catch (final Exception e) { + assertTrue(e instanceof NullPointerException); + } + + // registering listener on closed shard access should fail with IllegalStateEx + final LeaderLocationListener listener2 = mock(LeaderLocationListener.class); + shardAccess.close(); + try { + shardAccess.registerLeaderLocationListener(listener2); + fail("Should throw exception"); + } catch (final Exception ex) { + assertTrue(ex instanceof IllegalStateException); + } + } + + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testOnLeaderLocationChanged() { + final LeaderLocationListener listener1 = mock(LeaderLocationListener.class); + doThrow(new RuntimeException("Failed")).when(listener1).onLeaderLocationChanged(any()); + final LeaderLocationListener listener2 = mock(LeaderLocationListener.class); + doNothing().when(listener2).onLeaderLocationChanged(any()); + final LeaderLocationListener listener3 = mock(LeaderLocationListener.class); + doNothing().when(listener3).onLeaderLocationChanged(any()); + + final LeaderLocationListenerRegistration reg1 = shardAccess.registerLeaderLocationListener(listener1); + final LeaderLocationListenerRegistration reg2 = shardAccess.registerLeaderLocationListener(listener2); + final LeaderLocationListenerRegistration reg3 = shardAccess.registerLeaderLocationListener(listener3); + + // Error in listener1 should not affect dispatching change to other listeners + shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL); + verify(listener1).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); + verify(listener2).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); + verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); + + // Closed listeners shouldn't see new leader location changes + reg1.close(); + reg2.close(); + shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE); + verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.REMOTE)); + verifyNoMoreInteractions(listener1); + verifyNoMoreInteractions(listener2); + + // Closed shard access should not dispatch any new events + shardAccess.close(); + shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN); + verifyNoMoreInteractions(listener1); + verifyNoMoreInteractions(listener2); + verifyNoMoreInteractions(listener3); + + reg3.close(); + } + + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testGetShardIdentifier() { + assertEquals(shardAccess.getShardIdentifier(), TEST_ID); + + // closed shard access should throw illegal state + shardAccess.close(); + try { + shardAccess.getShardIdentifier(); + fail("Exception expected"); + } catch (final Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } + + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testGetLeaderLocation() { + // new shard access does not know anything about leader location + assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN); + + // we start getting leader location changes notifications + shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL); + assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.LOCAL); + + shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE); + shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN); + assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN); + + // closed shard access throws illegal state + shardAccess.close(); + try { + shardAccess.getLeaderLocation(); + fail("Should have failed with IllegalStateEx"); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } + + @Test + @SuppressWarnings("checkstyle:IllegalCatch") + public void testMakeLeaderLocal() throws Exception { + final FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS); + final ActorRef localShardRef = mock(ActorRef.class); + final Future localShardRefFuture = Futures.successful(localShardRef); + doReturn(localShardRefFuture).when(context).findLocalShardAsync(any()); + + // MakeLeaderLocal will reply with success + doReturn(Futures.successful(null)).when(context).executeOperationAsync((ActorRef) any(), any(), any()); + doReturn(getSystem().dispatcher()).when(context).getClientDispatcher(); + assertEquals(waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout), null); + + // MakeLeaderLocal will reply with failure + doReturn(Futures.failed(new LeadershipTransferFailedException("Failure"))) + .when(context).executeOperationAsync((ActorRef) any(), any(), any()); + + try { + waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout); + fail("makeLeaderLocal operation should not be successful"); + } catch (final Exception e) { + assertTrue(e instanceof LeadershipTransferFailedException); + } + + // we don't even find local shard + doReturn(Futures.failed(new LocalShardNotFoundException("Local shard not found"))) + .when(context).findLocalShardAsync(any()); + + try { + waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout); + fail("makeLeaderLocal operation should not be successful"); + } catch (final Exception e) { + assertTrue(e instanceof LeadershipTransferFailedException); + assertTrue(e.getCause() instanceof LocalShardNotFoundException); + } + + // closed shard access should throw IllegalStateEx + shardAccess.close(); + try { + shardAccess.makeLeaderLocal(); + fail("Should have thrown IllegalStateEx. ShardAccess is closed"); + } catch (final Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java new file mode 100644 index 0000000000..978cc27c9a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/RoleChangeListenerActorTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import static akka.actor.ActorRef.noSender; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.dom.api.LeaderLocation; +import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; + +public class RoleChangeListenerActorTest extends AbstractActorTest { + + @Test + public void testRegisterRoleChangeListenerOnStart() { + new JavaTestKit(getSystem()) { + { + final LeaderLocationListener listener = mock(LeaderLocationListener.class); + final Props props = RoleChangeListenerActor.props(getRef(), listener); + + getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart"); + expectMsgClass(RegisterRoleChangeListener.class); + } + }; + } + + @Test + public void testOnDataTreeChanged() { + final LeaderLocationListener listener = mock(LeaderLocationListener.class); + doNothing().when(listener).onLeaderLocationChanged(any()); + final Props props = RoleChangeListenerActor.props(getSystem().deadLetters(), listener); + + final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedChanged"); + + subject.tell(new LeaderStateChanged("member-1", null, (short) 0), noSender()); + verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.UNKNOWN)); + + subject.tell(new LeaderStateChanged("member-1", "member-1", (short) 0), noSender()); + verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.LOCAL)); + + subject.tell(new LeaderStateChanged("member-1", "member-2", (short) 0), noSender()); + verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.REMOTE)); + + } +} \ No newline at end of file -- 2.36.6