public LeadershipTransferFailedException(final String message) {
super(message);
}
+
+ public LeadershipTransferFailedException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.FutureConverters;
+import scala.concurrent.Future;
+
+
+/**
+ * Default {@link CDSShardAccess} implementation. Listens on leader location
+ * change events and distributes them to registered listeners. Also updates
+ * current information about leader location accordingly.
+ *
+ * <p>
+ * Sends {@link MakeLeaderLocal} message to local shards and translates its result
+ * on behalf users {@link #makeLeaderLocal()} calls.
+ *
+ * <p>
+ * {@link org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer} that
+ * creates instances of this class has to call {@link #close()} once it is no
+ * longer valid.
+ */
+final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(CDSShardAccessImpl.class);
+
+ private final Collection<LeaderLocationListener> listeners = ConcurrentHashMap.newKeySet();
+ private final DOMDataTreeIdentifier prefix;
+ private final ActorContext actorContext;
+ private final Timeout makeLeaderLocalTimeout;
+
+ private ActorRef roleChangeListenerActor;
+
+ private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN;
+ private volatile boolean closed = false;
+
+ CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorContext actorContext) {
+ this.prefix = Preconditions.checkNotNull(prefix);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.makeLeaderLocalTimeout =
+ new Timeout(actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2));
+
+ // register RoleChangeListenerActor
+ // TODO Maybe we should do this in async
+ final Optional<ActorRef> localShardReply =
+ actorContext.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+ Preconditions.checkState(localShardReply.isPresent(),
+ "Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix);
+ roleChangeListenerActor =
+ actorContext.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this));
+ }
+
+ private void checkNotClosed() {
+ Preconditions.checkState(!closed,
+ "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid");
+ }
+
+ @Override
+ @Nonnull
+ public DOMDataTreeIdentifier getShardIdentifier() {
+ checkNotClosed();
+ return prefix;
+ }
+
+ @Override
+ @Nonnull
+ public LeaderLocation getLeaderLocation() {
+ checkNotClosed();
+ // TODO before getting first notification from roleChangeListenerActor
+ // we will always return UNKNOWN
+ return currentLeader;
+ }
+
+ @Override
+ @Nonnull
+ public CompletionStage<Void> makeLeaderLocal() {
+ // TODO when we have running make leader local operation
+ // we should just return the same completion stage
+ checkNotClosed();
+
+ // TODO can we cache local shard actorRef?
+ final Future<ActorRef> localShardReply =
+ actorContext.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+ // we have to tell local shard to make leader local
+ final scala.concurrent.Promise<Object> makeLeaderLocalAsk = Futures.promise();
+ localShardReply.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - Cannot request leadership transfer to local shard.",
+ getShardIdentifier(), failure);
+ makeLeaderLocalAsk.failure(failure);
+ } else if (failure != null) {
+ // TODO should this be WARN?
+ LOG.debug("Failed to find local shard for {} - Cannot request leadership transfer to local shard.",
+ getShardIdentifier(), failure);
+ makeLeaderLocalAsk.failure(failure);
+ } else {
+ makeLeaderLocalAsk
+ .completeWith(actorContext
+ .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
+ }
+ }
+ }, actorContext.getClientDispatcher());
+
+ // we have to transform make leader local request result
+ Future<Void> makeLeaderLocalFuture = makeLeaderLocalAsk.future()
+ .transform(new Mapper<Object, Void>() {
+ @Override
+ public Void apply(final Object parameter) {
+ return null;
+ }
+ }, new Mapper<Throwable, Throwable>() {
+ @Override
+ public Throwable apply(final Throwable parameter) {
+ if (parameter instanceof LeadershipTransferFailedException) {
+ // do nothing with exception and just pass it as it is
+ return parameter;
+ }
+ // wrap exception in LeadershipTransferFailedEx
+ return new LeadershipTransferFailedException("Leadership transfer failed", parameter);
+ }
+ }, actorContext.getClientDispatcher());
+
+ return FutureConverters.toJava(makeLeaderLocalFuture);
+ }
+
+ @Override
+ @Nonnull
+ public <L extends LeaderLocationListener> LeaderLocationListenerRegistration<L>
+ registerLeaderLocationListener(@Nonnull final L listener) {
+ checkNotClosed();
+ Preconditions.checkNotNull(listener);
+ Preconditions.checkArgument(!listeners.contains(listener),
+ "Listener {} is already registered with ShardAccess {}", listener, this);
+
+ LOG.debug("Registering LeaderLocationListener {}", listener);
+
+ listeners.add(listener);
+
+ return new LeaderLocationListenerRegistration<L>() {
+ @Override
+ public L getInstance() {
+ return listener;
+ }
+
+ @Override
+ public void close() {
+ listeners.remove(listener);
+ }
+ };
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void onLeaderLocationChanged(@Nonnull final LeaderLocation location) {
+ if (closed) {
+ // we are closed already. Do not dispatch any new leader location
+ // change events.
+ return;
+ }
+
+ LOG.debug("Received leader location change notification. New leader location: {}", location);
+ currentLeader = location;
+ listeners.forEach(listener -> {
+ try {
+ listener.onLeaderLocationChanged(location);
+ } catch (Exception e) {
+ LOG.warn("Ignoring uncaught exception thrown be LeaderLocationListener {} "
+ + "during processing leader location change {}", listener, location, e);
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ // TODO should we also remove all listeners?
+ LOG.debug("Closing {} ShardAccess", prefix);
+ closed = true;
+
+ if (roleChangeListenerActor != null) {
+ // stop RoleChangeListenerActor
+ roleChangeListenerActor.tell(PoisonPill.getInstance(), noSender());
+ roleChangeListenerActor = null;
+ }
+ }
+}
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
+import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
final Future<Void> closeFuture = ask.transform(
new Mapper<Object, Void>() {
@Override
- public Void apply(Object parameter) {
+ public Void apply(final Object parameter) {
return null;
}
},
new Mapper<Throwable, Throwable>() {
@Override
- public Throwable apply(Throwable throwable) {
+ public Throwable apply(final Throwable throwable) {
return throwable;
}
}, actorSystem.dispatcher());
}
}
- private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
+ // TODO what about producers created by this producer?
+ // They should also be CDSProducers
+ private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer {
private final DOMDataTreeProducer delegate;
private final Collection<DOMDataTreeIdentifier> subtrees;
private final ActorRef shardDataTreeActor;
private final ActorContext actorContext;
+ @GuardedBy("shardAccessMap")
+ private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
ProxyProducer(final DOMDataTreeProducer delegate,
final Collection<DOMDataTreeIdentifier> subtrees,
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() throws DOMDataTreeProducerException {
delegate.close();
+ synchronized (shardAccessMap) {
+ shardAccessMap.values().forEach(CDSShardAccessImpl::close);
+ }
final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
if (o instanceof DOMDataTreeProducerException) {
protected DOMDataTreeProducer delegate() {
return delegate;
}
+
+ @Nonnull
+ @Override
+ public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
+ synchronized (shardAccessMap) {
+ Preconditions.checkArgument(subtrees.contains(subtree),
+ "Subtree {} is not controlled by this producer {}", subtree, this);
+ if (shardAccessMap.get(subtree) != null) {
+ return shardAccessMap.get(subtree);
+ }
+
+ // TODO Maybe we can have static factory method and return the same instance
+ // for same subtrees. But maybe it is not needed since there can be only one
+ // producer attached to some subtree at a time. And also how we can close ShardAccess
+ // then
+ final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(subtree, actorContext);
+ return shardAccessMap.put(subtree, shardAccess);
+ }
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+
+/**
+ * Proxy actor which acts as a facade for user-provided
+ * {@link LeaderLocationListener}. It subscribes for {@link LeaderStateChanged}
+ * notifications in its pre start hook and translates them to
+ * {@link LeaderLocationListener#onLeaderLocationChanged(LeaderLocation)}
+ * events.
+ */
+public class RoleChangeListenerActor extends AbstractUntypedActor {
+ private final LeaderLocationListener leaderLocationListener;
+ private final ActorRef roleChangeNotifier;
+
+ private RoleChangeListenerActor(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) {
+ this.roleChangeNotifier = Preconditions.checkNotNull(roleChangeNotifier);
+ this.leaderLocationListener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ super.preStart();
+ roleChangeNotifier.tell(new RegisterRoleChangeListener(), getSelf());
+ }
+
+ @Override
+ protected void handleReceive(final Object message) throws Exception {
+ if (message instanceof RoleChangeNotification) {
+ ignoreMessage(message);
+ } else if (message instanceof LeaderStateChanged) {
+ onLeaderStateChanged((LeaderStateChanged) message);
+ } else {
+ unknownMessage(message);
+ }
+ }
+
+ private void onLeaderStateChanged(final LeaderStateChanged message) {
+ final LeaderLocation newLocation;
+ if (message.getLeaderId() == null) {
+ newLocation = LeaderLocation.UNKNOWN;
+ } else if (message.getMemberId().equals(message.getLeaderId())) {
+ newLocation = LeaderLocation.LOCAL;
+ } else {
+ newLocation = LeaderLocation.REMOTE;
+ }
+
+ // TODO should we wrap this in try catch block?
+ leaderLocationListener.onLeaderLocationChanged(newLocation);
+ }
+
+ public static Props props(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) {
+ return Props.create(RoleChangeListenerActor.class, roleChangeNotifier, listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
+import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+public class CDSShardAccessImplTest extends AbstractActorTest {
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private CDSShardAccessImpl shardAccess;
+ private ActorContext context;
+
+ @Before
+ public void setUp() {
+ context = mock(ActorContext.class);
+ final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
+ doReturn(Optional.of(getSystem().deadLetters())).when(context).findLocalShard(any());
+ doReturn(datastoreContext).when(context).getDatastoreContext();
+ doReturn(getSystem()).when(context).getActorSystem();
+ shardAccess = new CDSShardAccessImpl(TEST_ID, context);
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void testRegisterLeaderLocationListener() {
+ final LeaderLocationListener listener1 = mock(LeaderLocationListener.class);
+
+ // first registration should be OK
+ shardAccess.registerLeaderLocationListener(listener1);
+
+ // second registration should fail with IllegalArgumentEx
+ try {
+ shardAccess.registerLeaderLocationListener(listener1);
+ fail("Should throw exception");
+ } catch (final Exception e) {
+ assertTrue(e instanceof IllegalArgumentException);
+ }
+
+ // null listener registration should fail with NPE
+ try {
+ shardAccess.registerLeaderLocationListener(null);
+ fail("Should throw exception");
+ } catch (final Exception e) {
+ assertTrue(e instanceof NullPointerException);
+ }
+
+ // registering listener on closed shard access should fail with IllegalStateEx
+ final LeaderLocationListener listener2 = mock(LeaderLocationListener.class);
+ shardAccess.close();
+ try {
+ shardAccess.registerLeaderLocationListener(listener2);
+ fail("Should throw exception");
+ } catch (final Exception ex) {
+ assertTrue(ex instanceof IllegalStateException);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void testOnLeaderLocationChanged() {
+ final LeaderLocationListener listener1 = mock(LeaderLocationListener.class);
+ doThrow(new RuntimeException("Failed")).when(listener1).onLeaderLocationChanged(any());
+ final LeaderLocationListener listener2 = mock(LeaderLocationListener.class);
+ doNothing().when(listener2).onLeaderLocationChanged(any());
+ final LeaderLocationListener listener3 = mock(LeaderLocationListener.class);
+ doNothing().when(listener3).onLeaderLocationChanged(any());
+
+ final LeaderLocationListenerRegistration reg1 = shardAccess.registerLeaderLocationListener(listener1);
+ final LeaderLocationListenerRegistration reg2 = shardAccess.registerLeaderLocationListener(listener2);
+ final LeaderLocationListenerRegistration reg3 = shardAccess.registerLeaderLocationListener(listener3);
+
+ // Error in listener1 should not affect dispatching change to other listeners
+ shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL);
+ verify(listener1).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+ verify(listener2).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+ verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+
+ // Closed listeners shouldn't see new leader location changes
+ reg1.close();
+ reg2.close();
+ shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE);
+ verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.REMOTE));
+ verifyNoMoreInteractions(listener1);
+ verifyNoMoreInteractions(listener2);
+
+ // Closed shard access should not dispatch any new events
+ shardAccess.close();
+ shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN);
+ verifyNoMoreInteractions(listener1);
+ verifyNoMoreInteractions(listener2);
+ verifyNoMoreInteractions(listener3);
+
+ reg3.close();
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void testGetShardIdentifier() {
+ assertEquals(shardAccess.getShardIdentifier(), TEST_ID);
+
+ // closed shard access should throw illegal state
+ shardAccess.close();
+ try {
+ shardAccess.getShardIdentifier();
+ fail("Exception expected");
+ } catch (final Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void testGetLeaderLocation() {
+ // new shard access does not know anything about leader location
+ assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN);
+
+ // we start getting leader location changes notifications
+ shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL);
+ assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.LOCAL);
+
+ shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE);
+ shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN);
+ assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN);
+
+ // closed shard access throws illegal state
+ shardAccess.close();
+ try {
+ shardAccess.getLeaderLocation();
+ fail("Should have failed with IllegalStateEx");
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void testMakeLeaderLocal() throws Exception {
+ final FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
+ final ActorRef localShardRef = mock(ActorRef.class);
+ final Future<ActorRef> localShardRefFuture = Futures.successful(localShardRef);
+ doReturn(localShardRefFuture).when(context).findLocalShardAsync(any());
+
+ // MakeLeaderLocal will reply with success
+ doReturn(Futures.successful(null)).when(context).executeOperationAsync((ActorRef) any(), any(), any());
+ doReturn(getSystem().dispatcher()).when(context).getClientDispatcher();
+ assertEquals(waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout), null);
+
+ // MakeLeaderLocal will reply with failure
+ doReturn(Futures.failed(new LeadershipTransferFailedException("Failure")))
+ .when(context).executeOperationAsync((ActorRef) any(), any(), any());
+
+ try {
+ waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout);
+ fail("makeLeaderLocal operation should not be successful");
+ } catch (final Exception e) {
+ assertTrue(e instanceof LeadershipTransferFailedException);
+ }
+
+ // we don't even find local shard
+ doReturn(Futures.failed(new LocalShardNotFoundException("Local shard not found")))
+ .when(context).findLocalShardAsync(any());
+
+ try {
+ waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout);
+ fail("makeLeaderLocal operation should not be successful");
+ } catch (final Exception e) {
+ assertTrue(e instanceof LeadershipTransferFailedException);
+ assertTrue(e.getCause() instanceof LocalShardNotFoundException);
+ }
+
+ // closed shard access should throw IllegalStateEx
+ shardAccess.close();
+ try {
+ shardAccess.makeLeaderLocal();
+ fail("Should have thrown IllegalStateEx. ShardAccess is closed");
+ } catch (final Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static akka.actor.ActorRef.noSender;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
+import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+
+public class RoleChangeListenerActorTest extends AbstractActorTest {
+
+ @Test
+ public void testRegisterRoleChangeListenerOnStart() {
+ new JavaTestKit(getSystem()) {
+ {
+ final LeaderLocationListener listener = mock(LeaderLocationListener.class);
+ final Props props = RoleChangeListenerActor.props(getRef(), listener);
+
+ getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart");
+ expectMsgClass(RegisterRoleChangeListener.class);
+ }
+ };
+ }
+
+ @Test
+ public void testOnDataTreeChanged() {
+ final LeaderLocationListener listener = mock(LeaderLocationListener.class);
+ doNothing().when(listener).onLeaderLocationChanged(any());
+ final Props props = RoleChangeListenerActor.props(getSystem().deadLetters(), listener);
+
+ final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedChanged");
+
+ subject.tell(new LeaderStateChanged("member-1", null, (short) 0), noSender());
+ verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.UNKNOWN));
+
+ subject.tell(new LeaderStateChanged("member-1", "member-1", (short) 0), noSender());
+ verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
+
+ subject.tell(new LeaderStateChanged("member-1", "member-2", (short) 0), noSender());
+ verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.REMOTE));
+
+ }
+}
\ No newline at end of file