<feature name="odl-nsf-service" description="OpenDaylight :: NSF :: Network Service Functions in Controller" version="${project.version}">
<feature version="${sal.version}">odl-adsal-all</feature>
- <feature version="${project.version}">odl-nsf-controller-managers</feature>
- <feature version="${project.version}">odl-adsal-controller-northbound</feature>
+ <feature version="${project.version}">odl-nsf-managers</feature>
+ <feature version="${project.version}">odl-adsal-northbound</feature>
</feature>
<feature name="odl-nsf-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions" version="${project.version}">
<bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
<bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting/${forwarding.staticrouting}</bundle>
-
- <bundle>mvn:org.opendaylight.controller.thirdparty/net.sf.jung2/2.0.1</bundle>
- <bundle>mvn:org.opendaylight.controller/routing.dijkstra_implementation/${routing.dijkstra_implementation.version}</bundle>
- </feature>
-
- <feature name="odl-nsf-controller-managers" description="OpenDaylight :: AD-SAL :: Network Service Functions in Controller" version="${project.version}">
- <feature version="${commons.opendaylight.version}">odl-base-all</feature>
- <feature version="${sal.version}">odl-adsal-all</feature>
- <bundle>mvn:org.opendaylight.controller/usermanager/${usermanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/usermanager.implementation/${usermanager.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/appauth/${appauth.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/connectionmanager/${connectionmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/connectionmanager.implementation/${connectionmanager.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/containermanager/${containermanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/containermanager.implementation/${containermanager.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/statisticsmanager/${statisticsmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/statisticsmanager.implementation/${statisticsmanager.implementation.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/switchmanager/${switchmanager.api.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/switchmanager.implementation/${switchmanager.implementation.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager/${forwardingrulesmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/forwardingrulesmanager.implementation/${forwardingrulesmanager.implementation.version}</bundle>
-
- <bundle>mvn:org.opendaylight.controller/topologymanager/${topologymanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/topologymanager.shell/${topologymanager.shell.version}</bundle>
-
<bundle>mvn:org.opendaylight.controller/hosttracker/${hosttracker.api.version}</bundle>
<bundle>mvn:org.opendaylight.controller/hosttracker.implementation/${hosttracker.implementation.version}</bundle>
<bundle>mvn:org.opendaylight.controller/hosttracker.shell/${hosttracker.shell.version}</bundle>
<bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
<bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
</feature>
-
- <feature name="odl-adsal-controller-northbound" description="OpenDaylight :: AD-SAL :: Northbound APIs in Controller" version="${project.version}">
- <feature version="${commons.opendaylight.version}">odl-base-all</feature>
- <feature version="${project.version}">odl-nsf-managers</feature>
- <bundle>mvn:org.ow2.asm/asm-all/${asm.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/bundlescanner/${bundlescanner.api.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/bundlescanner.implementation/${bundlescanner.implementation.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/commons.northbound/${northbound.commons.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/connectionmanager.northbound/${connectionmanager.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/flowprogrammer.northbound/${flowprogrammer.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/hosttracker.northbound/${hosttracker.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/networkconfig.bridgedomain.northbound/${networkconfig.bridgedomain.northbound.version}</bundle>
- <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.antlr/${eclipse.persistence.version}</bundle>
- <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.core/${eclipse.persistence.version}</bundle>
- <bundle>mvn:org.eclipse.persistence/org.eclipse.persistence.moxy/${eclipse.persistence.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/forwarding.staticrouting.northbound/${forwarding.staticrouting.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/statistics.northbound/${statistics.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/subnets.northbound/${subnets.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/switchmanager.northbound/${switchmanager.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/topology.northbound/${topology.northbound.version}</bundle>
- <bundle>mvn:org.opendaylight.controller/usermanager.northbound/${usermanager.northbound.version}</bundle>
- </feature>
</features>
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
+ if (snapshotTracker != null) {
+ // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
+ AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+ lastIndex(), lastTerm());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: snapshot install is in progress, replying immediately with {}", logName(), reply);
+ }
+ sender.tell(reply, actor());
+
+ return this;
+ }
+
// 1. Reply false if term < currentTerm (ยง5.1)
// This is handled in the appendEntries method of the base class
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
}
+
+ /**
+ * Verify that when an AppendEntries is sent to a follower during a snapshot install
+ * the Follower short-circuits the processing of the AppendEntries message.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(((Follower) follower).getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(((Follower) follower).getSnapshotTracker());
+
+ // Send an append entry
+ AppendEntries appendEntries = mock(AppendEntries.class);
+ doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
+ verify(appendEntries, never()).getPrevLogIndex();
+
+ }
+
@Test
public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
logStart("testInitialSyncUpWithHandleInstallSnapshot");
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
int offset = 0;
int snapshotLength = bsSnapshot.size();
int chunkSize = 50;
follower = createBehavior(context);
- HashMap<String, String> followerSnapshot = new HashMap<>();
- followerSnapshot.put("1", "A");
- followerSnapshot.put("2", "B");
- followerSnapshot.put("3", "C");
-
- ByteString bsSnapshot = toByteString(followerSnapshot);
+ ByteString bsSnapshot = createSnapshot();
InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
getNextChunk(bsSnapshot, 10, 50), 3, 3);
new MockRaftActorContext.MockPayload(data));
}
+ private ByteString createSnapshot(){
+ HashMap<String, String> followerSnapshot = new HashMap<>();
+ followerSnapshot.put("1", "A");
+ followerSnapshot.put("2", "B");
+ followerSnapshot.put("3", "C");
+
+ return toByteString(followerSnapshot);
+ }
+
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
ActorRef actorRef, RaftRPC rpc) throws Exception {
DataChangeScope scope) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new RegisterChangeListener(path, dataChangeListenerActor, scope),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
persistenceId(), listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
ListenerRegistrationMessages.RegisterChangeListener.class;
private final YangInstanceIdentifier path;
- private final ActorPath dataChangeListenerPath;
+ private final ActorRef dataChangeListener;
private final AsyncDataBroker.DataChangeScope scope;
public RegisterChangeListener(YangInstanceIdentifier path,
- ActorPath dataChangeListenerPath,
+ ActorRef dataChangeListener,
AsyncDataBroker.DataChangeScope scope) {
this.path = path;
- this.dataChangeListenerPath = dataChangeListenerPath;
+ this.dataChangeListener = dataChangeListener;
this.scope = scope;
}
}
public ActorPath getDataChangeListenerPath() {
- return dataChangeListenerPath;
+ return dataChangeListener.path();
}
public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
.setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
- .setDataChangeListenerActorPath(dataChangeListenerPath.toString())
+ .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
.setDataChangeScope(scope.ordinal()).build();
}
- public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){
+ public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
- actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(),
+ actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
}
package org.opendaylight.controller.cluster.datastore.messages;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
public class RegisterChangeListenerReply implements SerializableMessage{
public static final Class<ListenerRegistrationMessages.RegisterChangeListenerReply> SERIALIZABLE_CLASS =
ListenerRegistrationMessages.RegisterChangeListenerReply.class;
- private final ActorPath listenerRegistrationPath;
+ private final ActorRef listenerRegistration;
- public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) {
- this.listenerRegistrationPath = listenerRegistrationPath;
+ public RegisterChangeListenerReply(final ActorRef listenerRegistration) {
+ this.listenerRegistration = listenerRegistration;
}
public ActorPath getListenerRegistrationPath() {
- return listenerRegistrationPath;
+ return listenerRegistration.path();
}
@Override
public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() {
return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder()
- .setListenerRegistrationPath(listenerRegistrationPath.toString()).build();
+ .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build();
}
public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){
ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable;
return new RegisterChangeListenerReply(
- actorSystem.actorFor(o.getListenerRegistrationPath()).path()
+ actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath())
);
}
}
Assert.assertEquals("getPath", path, registerMsg.getPath());
Assert.assertEquals("getScope", scope, registerMsg.getScope());
- reply(new RegisterChangeListenerReply(getRef().path()));
+ reply(new RegisterChangeListenerReply(getRef()));
for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@Override
public Future<Object> answer(InvocationOnMock invocation) {
proxy.close();
- return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+ return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
}
};
"testRegisterChangeListener-DataChangeListener");
shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
- dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+ dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterChangeListenerReply.class);
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
// Now send the RegisterChangeListener and wait for the reply.
- shard.tell(new RegisterChangeListener(path, dclActor.path(),
+ shard.tell(new RegisterChangeListener(path, dclActor,
AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerReplyTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath());
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+ RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+ ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+ = registerChangeListenerReply.toSerializable();
+
+
+ RegisterChangeListenerReply fromSerialized
+ = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized);
+
+ assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerTest extends AbstractActorTest {
+
+ private TestActorFactory factory;
+
+ @Before
+ public void setUp(){
+ factory = new TestActorFactory(getSystem());
+ }
+
+ @After
+ public void shutDown(){
+ factory.close();
+ }
+
+ @Test
+ public void testToSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.BASE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+ NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath();
+
+ assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
+ assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
+ assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+
+ }
+
+ @Test
+ public void testFromSerializable(){
+ TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+ RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+ , AsyncDataBroker.DataChangeScope.SUBTREE);
+
+ ListenerRegistrationMessages.RegisterChangeListener serialized
+ = registerChangeListener.toSerializable();
+
+
+ RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized);
+
+ assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
+ assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
+ assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
+
+
+ }
+}
\ No newline at end of file
@Override
public void onComplete(final Throwable failure, final Object reply) throws Throwable {
if(failure != null) {
- LOG.error("InvokeRpc failed", failure);
+
+ // When we return a failure to the caller they can choose to log it if they like
+ // so here we just do basic warn logging by default and log the stack trace only when debug
+ // is enabled
+
+ LOG.warn("InvokeRpc failed rpc = {}, identifier = {}", rpcMsg.getRpc(), rpcMsg.getIdentifier());
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Detailed Error", failure);
+ }
final String message = String.format("Execution of RPC %s failed", rpcMsg.getRpc());
Collection<RpcError> errors = ((RpcErrorsException)failure).getRpcErrors();