--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An actor which is responsible for notifying an EntityOwnershipListener of changes.
+ *
+ * @author Thomas Pantelis
+ */
+class EntityOwnershipListenerActor extends AbstractUntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerActor.class);
+
+ private final EntityOwnershipListener listener;
+
+ private EntityOwnershipListenerActor(EntityOwnershipListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ protected void handleReceive(Object message) {
+ if(message instanceof EntityOwnershipChanged) {
+ onEntityOwnershipChanged((EntityOwnershipChanged)message);
+ }
+ }
+
+ private void onEntityOwnershipChanged(EntityOwnershipChanged change) {
+ LOG.debug("Notifying EntityOwnershipListener {}: {}", listener, change);
+
+ try {
+ listener.ownershipChanged(change.getEntity(), change.wasOwner(), change.isOwner());
+ } catch (Exception e) {
+ LOG.error("Error notifying listener {}", listener, e);
+ }
+ }
+
+ static Props props(EntityOwnershipListener listener) {
+ return Props.create(new EntityOwnershipListenerCreator(listener));
+ }
+
+ private static final class EntityOwnershipListenerCreator implements Creator<EntityOwnershipListenerActor> {
+ private static final long serialVersionUID = 1L;
+
+ private final EntityOwnershipListener listener;
+
+ EntityOwnershipListenerCreator(EntityOwnershipListener listener) {
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ public EntityOwnershipListenerActor create() {
+ return new EntityOwnershipListenerActor(listener);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages EntityOwnershipListener registrations and notifications for the EntityOwnershipShard.
+ *
+ * @author Thomas Pantelis
+ */
+class EntityOwnershipListenerSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
+
+ private final ActorContext actorContext;
+ private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
+ private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
+
+ EntityOwnershipListenerSupport(ActorContext actorContext) {
+ this.actorContext = actorContext;
+ }
+
+ void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
+ LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity);
+
+ if(entityListenerMap.put(entity, listener)) {
+ ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+ if(listenerEntry == null) {
+ listenerActorMap.put(listener, new ListenerActorRefEntry());
+ } else {
+ listenerEntry.referenceCount++;
+ }
+ }
+ }
+
+ void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
+ LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
+
+ if(entityListenerMap.remove(entity, listener)) {
+ ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+
+ LOG.debug("Found {}", listenerEntry);
+
+ listenerEntry.referenceCount--;
+ if(listenerEntry.referenceCount <= 0) {
+ listenerActorMap.remove(listener);
+
+ if(listenerEntry.actorRef != null) {
+ LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
+ listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }
+ }
+
+ void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
+ Collection<EntityOwnershipListener> listeners = entityListenerMap.get(entity);
+ if(listeners.isEmpty()) {
+ return;
+ }
+
+ EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
+ for(EntityOwnershipListener listener: listeners) {
+ ActorRef listenerActor = listenerActorFor(listener);
+
+ LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor,changed);
+
+ listenerActor.tell(changed, ActorRef.noSender());
+ }
+ }
+
+ private ActorRef listenerActorFor(EntityOwnershipListener listener) {
+ return listenerActorMap.get(listener).actorFor(listener);
+ }
+
+ private class ListenerActorRefEntry {
+ ActorRef actorRef;
+ int referenceCount = 1;
+
+ ActorRef actorFor(EntityOwnershipListener listener) {
+ if(actorRef == null) {
+ actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
+
+ LOG.debug("Created EntityOwnershipListenerActor {} for listener {}", actorRef, listener);
+ }
+
+ return actorRef;
+ }
+
+ @Override
+ public String toString() {
+ return "ListenerActorRefEntry [actorRef=" + actorRef + ", referenceCount=" + referenceCount + "]";
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+
+/**
+ * Message encapsulating an entity ownership change.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipChanged {
+ private final Entity entity;
+ private final boolean wasOwner;
+ private final boolean isOwner;
+
+ public EntityOwnershipChanged(Entity entity, boolean wasOwner, boolean isOwner) {
+ this.entity = Preconditions.checkNotNull(entity, "entity can't be null");
+ this.wasOwner = wasOwner;
+ this.isOwner = isOwner;
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+ public boolean wasOwner() {
+ return wasOwner;
+ }
+
+ public boolean isOwner() {
+ return isOwner;
+ }
+
+ @Override
+ public String toString() {
+ return "EntityOwnershipChanged [entity=" + entity + ", wasOwner=" + wasOwner + ", isOwner=" + isOwner + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for EntityOwnershipListenerActor.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipListenerActorTest extends AbstractActorTest {
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ @After
+ public void tearDown() {
+ actorFactory.close();
+ }
+
+ @Test
+ public void testOnEntityOwnershipChanged() {
+ EntityOwnershipListener mockListener = mock(EntityOwnershipListener.class);
+
+ TestActorRef<EntityOwnershipListenerActor> listenerActor = actorFactory.createTestActor(
+ EntityOwnershipListenerActor.props(mockListener), actorFactory.generateActorId("listener"));
+
+ Entity entity = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
+ boolean wasOwner = false;
+ boolean isOwner = true;
+ listenerActor.tell(new EntityOwnershipChanged(entity, wasOwner, isOwner), ActorRef.noSender());
+
+ verify(mockListener, timeout(5000)).ownershipChanged(entity, wasOwner, isOwner);
+ }
+
+ @Test
+ public void testOnEntityOwnershipChangedWithListenerEx() {
+ EntityOwnershipListener mockListener = mock(EntityOwnershipListener.class);
+
+ Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
+ doThrow(new RuntimeException("mock")).when(mockListener).ownershipChanged(entity1, false, true);
+ Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2")));
+ doNothing().when(mockListener).ownershipChanged(entity2, true, false);
+
+ TestActorRef<EntityOwnershipListenerActor> listenerActor = actorFactory.createTestActor(
+ EntityOwnershipListenerActor.props(mockListener), actorFactory.generateActorId("listener"));
+
+ listenerActor.tell(new EntityOwnershipChanged(entity1, false, true), ActorRef.noSender());
+ listenerActor.tell(new EntityOwnershipChanged(entity2, true, false), ActorRef.noSender());
+
+ verify(mockListener, timeout(5000)).ownershipChanged(entity2, true, false);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActorContext;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.collection.Iterator;
+import scala.collection.immutable.Iterable;
+
+/**
+ * Unit tests for EntityOwnershipListenerSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ @After
+ public void tearDown() {
+ actorFactory.close();
+ }
+
+ @Test
+ public void testNotifyEntityOwnershipListeners() {
+ TestActorRef<DoNothingActor> actor = actorFactory.createTestActor(
+ Props.create(DoNothingActor.class), actorFactory.generateActorId("test"));
+
+ UntypedActorContext actorContext = actor.underlyingActor().getContext();
+ EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext);
+
+ EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
+ EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
+ Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
+ Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2")));
+
+ // Add EntityOwnershipListener registrations.
+
+ support.addEntityOwnershipListener(entity1, mockListener1);
+ support.addEntityOwnershipListener(entity2, mockListener1);
+ support.addEntityOwnershipListener(entity1, mockListener2);
+
+ // Notify entity1 changed and verify both listeners are notified.
+
+ support.notifyEntityOwnershipListeners(entity1, false, true);
+
+ verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
+ verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
+ assertEquals("# of listener actors", 2, actorContext.children().size());
+
+ // Notify entity2 changed and verify only mockListener1 is notified.
+
+ support.notifyEntityOwnershipListeners(entity2, false, true);
+
+ verify(mockListener1, timeout(5000)).ownershipChanged(entity2, false, true);
+ verify(mockListener2, never()).ownershipChanged(entity2, false, true);
+ assertEquals("# of listener actors", 2, actorContext.children().size());
+
+ // Notify entity3 changed and verify neither listener is notified.
+
+ Entity entity3 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id3")));
+ support.notifyEntityOwnershipListeners(entity3, false, true);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+ verify(mockListener1, never()).ownershipChanged(entity3, false, true);
+ verify(mockListener2, never()).ownershipChanged(entity3, false, true);
+
+ reset(mockListener1, mockListener2);
+
+ // Unregister mockListener1 for entity1, issue a change and verify only mockListener2 is notified.
+
+ support.removeEntityOwnershipListener(entity1, mockListener1);
+
+ support.notifyEntityOwnershipListeners(entity1, false, true);
+
+ verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
+ verify(mockListener1, never()).ownershipChanged(entity1, false, true);
+
+ // Completely unregister both listeners and verify their listener actors are destroyed.
+
+ Iterable<ActorRef> listenerActors = actorContext.children();
+ assertEquals("# of listener actors", 2, listenerActors.size());
+
+ Iterator<ActorRef> iter = listenerActors.iterator();
+ ActorRef listenerActor1 = iter.next();
+ ActorRef listenerActor2 = iter.next();
+
+ JavaTestKit kit1 = new JavaTestKit(getSystem());
+ kit1.watch(listenerActor1);
+
+ JavaTestKit kit2 = new JavaTestKit(getSystem());
+ kit2.watch(listenerActor2);
+
+ support.removeEntityOwnershipListener(entity2, mockListener1);
+ support.removeEntityOwnershipListener(entity1, mockListener2);
+
+ kit1.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor1);
+ kit2.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor2);
+ assertEquals("# of listener actors", 0, actorContext.children().size());
+
+ // Re-register mockListener1 for entity1 and verify it is notified.
+
+ reset(mockListener1, mockListener2);
+
+ support.addEntityOwnershipListener(entity1, mockListener1);
+
+ support.notifyEntityOwnershipListeners(entity1, false, true);
+
+ verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
+ verify(mockListener2, never()).ownershipChanged(entity2, false, true);
+
+ // Quickly register and unregister mockListener2 - expecting no exceptions.
+
+ support.addEntityOwnershipListener(entity1, mockListener2);
+ support.removeEntityOwnershipListener(entity1, mockListener2);
+ }
+}