Merge "Do not override jsr305 version"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 23 Feb 2015 10:43:00 +0000 (10:43 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 23 Feb 2015 10:43:00 +0000 (10:43 +0000)
179 files changed:
features/mdsal/pom.xml
features/mdsal/src/main/resources/features.xml
features/netconf-connector/pom.xml
features/netconf-connector/src/main/resources/features.xml
features/netconf/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-api/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-api/src/main/yang/event-aggregator.yang [new file with mode: 0644]
opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang [new file with mode: 0644]
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CaptureSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntries.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-akka-raft/src/test/resources/simplelogger.properties
opendaylight/md-sal/sal-binding-dom-it/pom.xml
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/ConcurrentImplicitCreateTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/WildcardedDataChangeListenerTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/DOMCodecBug02Test.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/DOMCodecBug03Test.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/DeleteNestedAugmentationListenParentTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/FlagsSerializationTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/WriteParentListenAugmentTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/WriteParentReadChildTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/BrokerIntegrationTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/ChangeOriginatedInDomBrokerTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerMountPointTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/DOMRpcServiceTestBugfix560.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/MessageCapturingFlowService.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActor.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/util/compat/DataNormalizer.java
opendaylight/md-sal/sal-common-impl/src/test/java/org/opendaylight/controller/md/sal/common/impl/util/compat/DataNormalizerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/java/org/opendaylight/controller/dummy/datastore/DummyShard.java
opendaylight/md-sal/sal-dummy-distributed-datastore/src/main/resources/simplelogger.properties [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/md/sal/connector/netconf/NetconfConnectorModule.java
opendaylight/md-sal/sal-test-model/src/main/yang/opendaylight-of-migration-test-model.yang [new file with mode: 0644]
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/Activator.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceFactoryImpl.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/NetconfOperationServiceImpl.java
opendaylight/netconf/config-netconf-connector/src/main/java/org/opendaylight/controller/netconf/confignetconfconnector/osgi/YangStoreService.java
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/NetconfMappingTest.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/ConfigPusherImpl.java
opendaylight/netconf/config-persister-impl/src/main/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterActivator.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/ConfigPersisterTest.java
opendaylight/netconf/config-persister-impl/src/test/java/org/opendaylight/controller/netconf/persist/impl/osgi/MockedBundleContext.java
opendaylight/netconf/mdsal-netconf-connector/pom.xml [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/mapper/NetconfMdsalMapperModule.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/config/yang/netconf/mdsal/mapper/NetconfMdsalMapperModuleFactory.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/CurrentSchemaContext.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationService.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/MdsalNetconfOperationServiceFactory.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/OperationProvider.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/TransactionProvider.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Commit.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Datastore.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/DiscardChanges.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/EditConfig.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Lock.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/Unlock.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/AbstractGet.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/Get.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/get/GetConfig.java [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/main/yang/netconf-mdsal-mapper.yang [new file with mode: 0644]
opendaylight/netconf/netconf-api/pom.xml
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/Capability.java [moved from opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/Capability.java with 93% similarity]
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/NetconfServerDispatcher.java [new file with mode: 0644]
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/monitoring/CapabilityListener.java [new file with mode: 0644]
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/monitoring/NetconfMonitoringService.java
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/monitoring/SessionListener.java [moved from opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/SessionMonitoringService.java with 59% similarity]
opendaylight/netconf/netconf-api/src/main/java/org/opendaylight/controller/netconf/api/util/NetconfConstants.java [new file with mode: 0644]
opendaylight/netconf/netconf-api/src/main/yang/netconf-northbound.yang [new file with mode: 0644]
opendaylight/netconf/netconf-artifacts/pom.xml
opendaylight/netconf/netconf-config/src/main/resources/initial/01-netconf.xml
opendaylight/netconf/netconf-connector-config/src/main/resources/initial/99-netconf-connector.xml
opendaylight/netconf/netconf-impl/pom.xml
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/config/yang/config/netconf/northbound/impl/NetconfServerDispatcherModule.java [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/config/yang/config/netconf/northbound/impl/NetconfServerDispatcherModuleFactory.java [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/CapabilityProviderImpl.java [deleted file]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/CommitNotifier.java [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/DefaultCommitNotificationProducer.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerDispatcherImpl.java [moved from opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerDispatcher.java with 86% similarity]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListener.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionListenerFactory.java [deleted file]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSessionNegotiatorFactory.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/mapping/CapabilityProvider.java [deleted file]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCommit.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/AggregatedNetconfOperationServiceFactory.java [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfImplActivator.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfMonitoringServiceImpl.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationRouterImpl.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationServiceFactoryListenerImpl.java [deleted file]
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationServiceFactoryTracker.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationServiceSnapshotImpl.java [deleted file]
opendaylight/netconf/netconf-impl/src/main/yang/netconf-northbound-impl.yang [new file with mode: 0644]
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/ConcurrentClientsTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfDispatcherImplTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/NetconfMonitoringServiceImplTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultCommitTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/osgi/NetconfImplActivatorTest.java
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/osgi/NetconfOperationServiceFactoryTrackerTest.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/AbstractNetconfConfigTest.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfConfigPersisterITTest.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfITMonitoringTest.java
opendaylight/netconf/netconf-mapping-api/pom.xml
opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/NetconfOperationProvider.java [deleted file]
opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/NetconfOperationService.java
opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/NetconfOperationServiceFactory.java
opendaylight/netconf/netconf-mapping-api/src/main/java/org/opendaylight/controller/netconf/mapping/api/NetconfOperationServiceSnapshot.java [deleted file]
opendaylight/netconf/netconf-mapping-api/src/main/yang/netconf-northbound-mapper.yang [new file with mode: 0644]
opendaylight/netconf/netconf-mdsal-config/pom.xml [new file with mode: 0644]
opendaylight/netconf/netconf-mdsal-config/src/main/resources/initial/08-netconf-mdsal.xml [new file with mode: 0644]
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/GetSchema.java [moved from opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultGetSchema.java with 75% similarity]
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringActivator.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringOperationService.java
opendaylight/netconf/netconf-monitoring/src/main/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringServiceTracker.java
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/GetSchemaTest.java [moved from opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/mapping/operations/DefaultGetSchemaTest.java with 82% similarity]
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/osgi/NetconfMonitoringOperationServiceTest.java
opendaylight/netconf/netconf-monitoring/src/test/java/org/opendaylight/controller/netconf/monitoring/xml/JaxBSerializerTest.java
opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/osgi/Activator.java
opendaylight/netconf/netconf-ssh/pom.xml
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/config/yang/netconf/northbound/ssh/NetconfNorthboundSshModule.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/config/yang/netconf/northbound/ssh/NetconfNorthboundSshModuleFactory.java [new file with mode: 0644]
opendaylight/netconf/netconf-ssh/src/main/yang/netconf-northbound-ssh.yang [new file with mode: 0644]
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/FakeModuleBuilderCapability.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/ModuleBuilderCapability.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java
opendaylight/netconf/pom.xml

index 5e6afd2..c63b39c 100644 (file)
       <classifier>features</classifier>
       <type>xml</type>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>features-netconf</artifactId>
+      <classifier>features</classifier>
+      <type>xml</type>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>features-config-persister</artifactId>
index 8c166e6..5b9f4a6 100644 (file)
@@ -7,6 +7,7 @@
     <repository>mvn:org.opendaylight.controller/features-config/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-persister/${config.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-netty/${config.version}/xml/features</repository>
+    <repository>mvn:org.opendaylight.controller/features-netconf/${netconf.version}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-akka/${commons.opendaylight.version}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
       <bundle>mvn:org.opendaylight.controller/sal-common-impl/${mdsal.version}</bundle>
       <bundle>mvn:org.opendaylight.controller/sal-common-util/${mdsal.version}</bundle>
     </feature>
+
+    <!-- TODO move to netconf features, however there are some weird dependencies on features-config-persister all over that cause cyclic dependencies-->
+    <feature name='odl-netconf-mdsal' version='${project.version}' description="OpenDaylight :: Netconf :: All">
+        <feature version='${config.version}'>odl-config-all</feature>
+        <feature version='${netconf.version}'>odl-netconf-all</feature>
+        <bundle>mvn:org.opendaylight.controller/netconf-ssh/${netconf.version}</bundle>
+        <feature version='${mdsal.version}'>odl-mdsal-broker</feature>
+        <bundle>mvn:org.opendaylight.controller/mdsal-netconf-connector/${netconf.version}</bundle>
+        <!-- TODO 01-netconf.xml file requires netconf-config-dispatcher to be present and its part of netconf-connector features. Clean Up-->
+        <bundle>mvn:org.opendaylight.controller/netconf-config-dispatcher/${config.version}</bundle>
+        <configfile finalname='${config.configfile.directory}/${config.netconf.client.configfile}'>mvn:org.opendaylight.controller/netconf-config/${netconf.version}/xml/config</configfile>
+        <configfile finalname='${config.configfile.directory}/${config.netconf.mdsal.configfile}'>mvn:org.opendaylight.controller/netconf-mdsal-config/${netconf.version}/xml/config</configfile>
+    </feature>
+
     <feature name='odl-mdsal-broker' version='${project.version}' description="OpenDaylight :: MDSAL :: Broker">
         <feature version='${yangtools.version}'>odl-yangtools-common</feature>
         <feature version='${yangtools.version}'>odl-yangtools-binding</feature>
index c69ee19..4e94996 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-config-dispatcher</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>mdsal-netconf-connector</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-tcp</artifactId>
index 863833b..92e6507 100644 (file)
@@ -42,6 +42,8 @@
         -->
         <feature version='${project.version}'>odl-netconf-connector</feature>
         <feature version='${project.version}'>odl-netconf-connector-ssh</feature>
+
+
     </feature>
     <!--
         Necessary TODO: Define your features.  It is useful to list then in order of dependency.  So if A depends on B, list A first.
index 997b6a2..4edd936 100644 (file)
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-config</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>netconf-mdsal-config</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>netconf-auth</artifactId>
index 39c5672..43ac4ae 100644 (file)
@@ -15,7 +15,7 @@
 
   <properties>
 
-    <akka.version>2.3.4</akka.version>
+    <akka.version>2.3.9</akka.version>
     <appauth.version>0.5.0-SNAPSHOT</appauth.version>
     <archetype-app-northbound>0.1.0-SNAPSHOT</archetype-app-northbound>
     <arphandler.version>0.6.0-SNAPSHOT</arphandler.version>
@@ -61,6 +61,7 @@
     <config.xsql.configfile>04-xsql.xml</config.xsql.configfile>
     <config.netconf.client.configfile>01-netconf.xml</config.netconf.client.configfile>
     <config.toaster.configfile>03-toaster-sample.xml</config.toaster.configfile>
+    <config.netconf.mdsal.configfile>08-mdsal-netconf.xml</config.netconf.mdsal.configfile>
     <config.restconf.configfile>10-rest-connector.xml</config.restconf.configfile>
     <config.netconf.connector.configfile>99-netconf-connector.xml</config.netconf.connector.configfile>
     <configuration.implementation.version>0.5.0-SNAPSHOT</configuration.implementation.version>
index f88e09c..420f888 100644 (file)
                 <type>xml</type>
             </dependency>
 
+            <!-- MessageBus -->
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>message-bus-api</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>message-bus-impl</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 </project>
diff --git a/opendaylight/md-sal/messagebus-api/pom.xml b/opendaylight/md-sal/messagebus-api/pom.xml
new file mode 100644 (file)
index 0000000..542308a
--- /dev/null
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.opendaylight.controller</groupId>
+        <artifactId>sal-parent</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>message-bus-api</artifactId>
+    <name>${project.artifactId}</name>
+
+    <packaging>bundle</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opendaylight.controller.model</groupId>
+            <artifactId>model-inventory</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.model</groupId>
+            <artifactId>yang-ext</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools.model</groupId>
+            <artifactId>ietf-topology</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                        <configuration>
+                            <codeGenerators>
+                                <generator>
+                                    <codeGeneratorClass>org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl</codeGeneratorClass>
+                                    <outputBaseDir>${project.build.directory}/generated-sources/sal</outputBaseDir>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator</codeGeneratorClass>
+                                    <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>
+                                    <additionalConfiguration>
+                                        <namespaceToPackage1>urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang</namespaceToPackage1>
+                                    </additionalConfiguration>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+                                    <outputBaseDir>target/site/models</outputBaseDir>
+                                </generator>
+                            </codeGenerators>
+                            <inspectDependencies>true</inspectDependencies>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/config</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <Export-Package>org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.*</Export-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/opendaylight/md-sal/messagebus-api/src/main/yang/event-aggregator.yang b/opendaylight/md-sal/messagebus-api/src/main/yang/event-aggregator.yang
new file mode 100644 (file)
index 0000000..ad7b573
--- /dev/null
@@ -0,0 +1,131 @@
+module event-aggregator {
+    // FIXME: this module needs to be split up to concepts and API
+    //        as the concepts are shared with the other model in this
+    //        package.
+    yang-version 1;
+    namespace "urn:cisco:params:xml:ns:yang:messagebus:eventaggregator";
+    prefix "eventaggregator";
+
+    organization "Cisco Systems, Inc.";
+    contact "Robert Gallas";
+
+    description
+        "Module implementing message but RPC.
+
+        Copyright (c)2014 Cisco Systems, Inc. All rights reserved.
+
+        This program and the accompanying materials are made available
+        under the terms of the Eclipse Public License v1.0 which
+        accompanies this distribution, and is available at
+        http://www.eclipse.org/legal/epl-v10.html";
+
+    revision "2014-12-02" {
+        description "Initial revision";
+    }
+
+    typedef pattern {
+        type string {
+            length 1..max;
+        }
+
+        // FIXME: make this a regular expression
+        description "A match pattern. Specifically this is a wildcard pattern.";
+    }
+
+    typedef notification-pattern {
+        type pattern;
+        description
+            "Pattern for matching candidate notification types. This pattern is to be
+            applied against the concatenation of the namespace of the module which
+            defines that particular notification, followed by a single colon, and
+            then followed by notification identifier, as supplied in the argument to
+            the notification statement.";
+    }
+
+    typedef topic-id {
+        type string {
+            length 1..max;
+        }
+        description
+            "A topic identifier. It uniquely defines a topic as seen by the the user
+            of this model's RPCs";
+    }
+
+    // FIXME: we would really like to share instances here, but that requires some sort
+    //        of sane reference counting. The reason for sharing is the data path part
+    //        of notification delivery -- multiple creators of topics can still share
+    //        a single data path.
+    rpc create-topic {
+        description
+            "Create a new topic. A topic is an aggregation of several notification
+            types from a set of nodes. Each successful invocation results in a unique
+            topic being created. The caller is responsible for removing the topic
+            once it is no longer needed.";
+
+        input {
+            leaf notification-pattern {
+                type notification-pattern;
+                mandatory true;
+                description
+                    "Pattern matching notification which should be forwarded into this
+                    topic.";
+            }
+
+            leaf node-id-pattern {
+                type pattern;
+                mandatory true;
+                description
+                    "Pattern for matching candidate event source nodes when looking
+                    for contributors to the topic. The pattern will be applied against
+                    /network-topology/topology/node/node-id";
+            }
+        }
+
+        output {
+            leaf topic-id {
+                type topic-id;
+                mandatory true;
+            }
+        }
+    }
+
+    rpc destroy-topic {
+        description
+            "Destroy a topic. No further messages will be delivered to it.";
+
+        input {
+            leaf topic-id {
+                type topic-id;
+                mandatory true;
+            }
+        }
+    }
+
+    notification topic-notification {
+        description
+            "Notification of an event occuring on a particular node. This notification
+            acts as an encapsulation for the event being delivered.";
+
+        leaf topic-id {
+            type topic-id;
+            mandatory true;
+            description
+                "Topic to which this event is being delivered.";
+        }
+
+        leaf node-id {
+            // FIXME: should be topology node ID
+            type string;
+            mandatory true;
+            description
+                "Node ID of the node which generated the event.";
+        }
+
+        anyxml payload {
+            mandatory true;
+            description
+                "Encapsulated notification. The format is the XML representation of
+                a notification according to RFC6020 section 7.14.2.";
+        }
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang b/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang
new file mode 100644 (file)
index 0000000..5dd416c
--- /dev/null
@@ -0,0 +1,85 @@
+module event-source {
+    yang-version 1;
+    namespace "urn:cisco:params:xml:ns:yang:messagebus:eventsource";
+    prefix "eventsource";
+
+    import event-aggregator { prefix aggr; }
+    import network-topology { prefix nt; revision-date "2013-10-21"; }
+    import opendaylight-inventory {prefix inv; revision-date "2013-08-19"; }
+    import yang-ext {prefix ext; revision-date "2013-07-09"; }
+
+    organization "Cisco Systems, Inc.";
+    contact "Robert Gallas";
+
+    description
+        "Base model for a topology where individual nodes can produce events.
+
+        Module implementing event source topology and encapped notification.
+
+        Copyright (c)2014 Cisco Systems, Inc. All rights reserved.
+
+        This program and the accompanying materials are made available
+        under the terms of the Eclipse Public License v1.0 which
+        accompanies this distribution, and is available at
+        http://www.eclipse.org/legal/epl-v10.html";
+
+    revision "2014-12-02" {
+        description "first revision";
+    }
+
+    // FIXME: expand this
+    typedef join-topic-status {
+        type enumeration {
+            enum up;
+            enum down;
+        }
+        description "Object status";
+    }
+
+    // FIXME: migrate to topology
+    typedef node-ref {
+        type leafref {
+            path "/inv:nodes/inv:node/inv:id";
+        }
+    }
+
+    grouping topology-event-source-type {
+        container topology-event-source {
+            presence "indicates an event source-aware topology";
+        }
+    }
+
+    rpc join-topic {
+        input {
+            leaf node {
+               ext:context-reference "inv:node-context";
+               type "instance-identifier";
+            }
+            leaf topic-id {
+                type aggr:topic-id;
+                description "in current implementation notification-pattern is defined by topic-id.
+                             By persisting topic definition we could omit notification-pattern";
+            }
+            leaf notification-pattern {
+                type aggr:notification-pattern;
+            }
+        }
+
+        output {
+            leaf status {
+                type join-topic-status;
+            }
+        }
+    }
+
+    augment "/nt:network-topology/nt:topology/nt:topology-types" {
+        uses topology-event-source-type;
+    }
+
+    augment "/nt:network-topology/nt:topology/nt:node" {
+        when "../../nt:topology-types/topology-event-source";
+        leaf event-source-node {
+            type node-ref;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/pom.xml b/opendaylight/md-sal/messagebus-impl/pom.xml
new file mode 100644 (file)
index 0000000..8e088ba
--- /dev/null
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<project xmlns="http://maven.apache.org/POM/4.0.0"\r
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+    <modelVersion>4.0.0</modelVersion>\r
+\r
+    <parent>\r
+        <groupId>org.opendaylight.controller</groupId>\r
+        <artifactId>sal-parent</artifactId>\r
+        <version>1.2.0-SNAPSHOT</version>\r
+    </parent>\r
+\r
+    <artifactId>message-bus-impl</artifactId>\r
+    <name>${project.artifactId}</name>\r
+\r
+    <packaging>bundle</packaging>\r
+\r
+    <dependencies>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>ietf-netconf-notifications</artifactId>\r
+            <version>0.3.0-SNAPSHOT</version>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-binding-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-core-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-common-util</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.yangtools</groupId>\r
+            <artifactId>yang-data-impl</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>config-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>message-bus-api</artifactId>\r
+        </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-binding-config</artifactId>\r
+        </dependency>\r
+    </dependencies>\r
+\r
+    <build>\r
+        <plugins>\r
+            <plugin>\r
+                <groupId>org.opendaylight.yangtools</groupId>\r
+                <artifactId>yang-maven-plugin</artifactId>\r
+                <executions>\r
+                    <execution>\r
+                        <goals>\r
+                            <goal>generate-sources</goal>\r
+                        </goals>\r
+                        <configuration>\r
+                            <codeGenerators>\r
+                                <generator>\r
+                                    <codeGeneratorClass>\r
+                                        org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl\r
+                                    </codeGeneratorClass>\r
+                                    <outputBaseDir>\r
+                                        ${project.build.directory}/generated-sources/sal\r
+                                    </outputBaseDir>\r
+                                </generator>\r
+                                <generator>\r
+                                    <codeGeneratorClass>\r
+                                        org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator\r
+                                    </codeGeneratorClass>\r
+                                    <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>\r
+                                    <additionalConfiguration>\r
+                                        <namespaceToPackage1>\r
+                                            urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang\r
+                                        </namespaceToPackage1>\r
+                                    </additionalConfiguration>\r
+                                </generator>\r
+                                <generator>\r
+                                    <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>\r
+                                    <outputBaseDir>target/site/models</outputBaseDir>\r
+                                </generator>\r
+                            </codeGenerators>\r
+                            <inspectDependencies>true</inspectDependencies>\r
+                        </configuration>\r
+                    </execution>\r
+                </executions>\r
+            </plugin>\r
+            <plugin>\r
+                <groupId>org.codehaus.mojo</groupId>\r
+                <artifactId>build-helper-maven-plugin</artifactId>\r
+                <version>1.8</version>\r
+                <executions>\r
+                    <execution>\r
+                        <id>add-source</id>\r
+                        <phase>generate-sources</phase>\r
+                        <goals>\r
+                            <goal>add-source</goal>\r
+                        </goals>\r
+                        <configuration>\r
+                            <sources>\r
+                                <source>${project.build.directory}/generated-sources/config</source>\r
+                            </sources>\r
+                        </configuration>\r
+                    </execution>\r
+                </executions>\r
+            </plugin>\r
+        </plugins>\r
+    </build>\r
+</project>\r
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
new file mode 100644 (file)
index 0000000..1c2b78a
--- /dev/null
@@ -0,0 +1,75 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
+import org.opendaylight.controller.mdsal.InitializationContext;
+import org.opendaylight.controller.mdsal.Providers;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
+
+    private BundleContext bundleContext;
+
+    public BundleContext getBundleContext() {
+        return bundleContext;
+    }
+
+    public void setBundleContext(BundleContext bundleContext) {
+        this.bundleContext = bundleContext;
+    }
+
+    public MessageBusAppImplModule( ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+        super(identifier, dependencyResolver);
+    }
+
+    public MessageBusAppImplModule( ModuleIdentifier identifier,
+                                    DependencyResolver dependencyResolver,
+                                    MessageBusAppImplModule oldModule,
+                                    java.lang.AutoCloseable oldInstance) {
+        super(identifier, dependencyResolver, oldModule, oldInstance);
+    }
+
+    @Override
+    protected void customValidation() {}
+
+    @Override
+    public java.lang.AutoCloseable createInstance() {
+        List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
+        InitializationContext ic = new InitializationContext(namespaceMapping);
+
+        final Providers.BindingAware bap = new Providers.BindingAware(ic);
+        final Providers.BindingIndependent bip = new Providers.BindingIndependent(ic);
+
+        getBindingBrokerDependency().registerProvider(bap, getBundleContext());
+        getDomBrokerDependency().registerProvider(bip);
+
+        AutoCloseable closer = new AutoCloseable() {
+            @Override public void close()  {
+                closeProvider(bap);
+                closeProvider(bip);
+            }
+        };
+
+        return closer;
+    }
+
+    private void closeProvider(AutoCloseable closable) {
+        try {
+            closable.close();
+        } catch (Exception e) {
+            LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactory.java
new file mode 100644 (file)
index 0000000..8bee2d1
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+* Generated file
+*
+* Generated from: yang module name: message-bus-app-impl yang module local name: messagebus-app-impl
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Tue Feb 03 09:03:11 CET 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.messagebus.app.impl;
+
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
+import org.opendaylight.controller.config.spi.Module;
+import org.osgi.framework.BundleContext;
+
+public class MessageBusAppImplModuleFactory extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModuleFactory {
+    @Override
+    public Module createModule(String instanceName,
+                               DependencyResolver dependencyResolver,
+                               BundleContext bundleContext) {
+
+        MessageBusAppImplModule module =
+                (MessageBusAppImplModule) super.createModule(instanceName,
+                        dependencyResolver,
+                        bundleContext);
+
+        module.setBundleContext(bundleContext);
+
+        return module;
+    }
+
+    @Override
+    public Module createModule(String instanceName,
+                               DependencyResolver dependencyResolver,
+                               DynamicMBeanWithInstance old,
+                               BundleContext bundleContext)
+            throws Exception {
+
+        MessageBusAppImplModule module =
+                (MessageBusAppImplModule) super.createModule(instanceName,
+                        dependencyResolver,
+                        old,
+                        bundleContext);
+
+        module.setBundleContext(bundleContext);
+
+        return module;
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/DataStore.java
new file mode 100644 (file)
index 0000000..a881fac
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.mdsal;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public class DataStore {
+    private static final FutureCallback<Void> DEFAULT_CALLBACK =
+            new FutureCallback<Void>() {
+                public void onSuccess(Void result) {
+                    // TODO: Implement default behaviour
+                }
+
+                public void onFailure(Throwable t) {
+                    // TODO: Implement default behaviour
+                };
+            };
+
+    private final MdSAL mdSAL;
+
+    public DataStore(MdSAL mdSAL) {
+        this.mdSAL = mdSAL;
+    }
+
+    public ListenerRegistration<DataChangeListener> registerDataChangeListener(LogicalDatastoreType store,
+                                                                               InstanceIdentifier<?> path,
+                                                                               DataChangeListener listener,
+                                                                               AsyncDataBroker.DataChangeScope triggeringScope) {
+        return mdSAL.getDataBroker().registerDataChangeListener(store, path, listener, triggeringScope);
+    }
+
+    public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
+                                                InstanceIdentifier<T> path,
+                                                T data) {
+        asyncPUT(datastoreType, path, data, DEFAULT_CALLBACK);
+    }
+
+    public <T extends DataObject> void asyncPUT(LogicalDatastoreType datastoreType,
+                                                InstanceIdentifier<T> path,
+                                                T data,
+                                                FutureCallback<Void> callback) {
+        WriteTransaction tx = mdSAL.getDataBroker().newWriteOnlyTransaction();
+        tx.put(datastoreType, path, data, true);
+        execPut(tx, callback);
+    }
+
+    public <T extends DataObject> T read(LogicalDatastoreType datastoreType,
+                                         InstanceIdentifier<T> path) {
+
+        ReadOnlyTransaction tx = mdSAL.getDataBroker().newReadOnlyTransaction();
+        T result = null;
+
+        try {
+            result = tx.read(datastoreType, path).get().get();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return result;
+    }
+
+    private static void execPut(WriteTransaction tx, FutureCallback<Void> callback) {
+        Futures.addCallback(tx.submit(), callback);
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/InitializationContext.java
new file mode 100644 (file)
index 0000000..c73fb2a
--- /dev/null
@@ -0,0 +1,61 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.mdsal;
+
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.messagebus.app.impl.EventAggregator;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceManager;
+import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class InitializationContext {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InitializationContext.class);
+
+    private final MdSAL mdSal;
+    private final DataStore dataStore;
+    private final EventSourceTopology eventSourceTopology;
+    private final EventSourceManager eventSourceManager;
+    private final EventAggregator eventAggregator;
+
+    public InitializationContext(List<NamespaceToStream> namespaceMapping) {
+        this.mdSal = new MdSAL();
+        this.dataStore = new DataStore(mdSal);
+        this.eventSourceTopology = new EventSourceTopology(dataStore);
+        this.eventSourceManager = new EventSourceManager(dataStore, mdSal, eventSourceTopology, namespaceMapping);
+        this.eventAggregator = new EventAggregator(mdSal, eventSourceTopology);
+    }
+
+    public synchronized void set(BindingAwareBroker.ProviderContext session) {
+        mdSal.setBindingAwareContext(session);
+
+        if (mdSal.isReady()) {
+            initialize();
+        }
+    }
+
+    public synchronized void set(Broker.ProviderSession session) {
+        mdSal.setBindingIndependentContext(session);
+
+        if (mdSal.isReady()) {
+            initialize();
+        }
+    }
+
+    private void initialize() {
+        eventSourceTopology.mdsalReady();
+        eventSourceManager.mdsalReady();
+        eventAggregator.mdsalReady();
+
+        LOGGER.info("InitializationContext started.");
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/MdSAL.java
new file mode 100644 (file)
index 0000000..03b220a
--- /dev/null
@@ -0,0 +1,188 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.mdsal;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareService;
+import org.opendaylight.controller.sal.binding.api.mount.MountInstance;
+import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.controller.sal.core.api.notify.NotificationService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MdSAL {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MdSAL.class);
+
+    private BindingAwareBroker.ProviderContext bindingAwareContext;
+    private Broker.ProviderSession bindingIndependentContext;
+
+    // -----------------------------
+    // ----- FRAMEWORK METHODS -----
+    // -----------------------------
+    public void setBindingAwareContext(BindingAwareBroker.ProviderContext bindingAwareContext) {
+        this.bindingAwareContext = bindingAwareContext;
+    }
+
+    public void setBindingIndependentContext(Broker.ProviderSession bindingIndependentContext) {
+        this.bindingIndependentContext = bindingIndependentContext;
+    }
+
+    //TODO: We should hide brokers and expose functionalities instead
+    public DataBroker getDataBroker() {
+        return getBaSalService(DataBroker.class);
+    }
+
+    public synchronized boolean isReady() {
+        return (bindingAwareContext != null && bindingIndependentContext != null);
+    }
+
+    // -----------------------
+    // ----- API METHODS -----
+    // -----------------------
+    // TODO: Factor out API methods to interface
+    // method does not return registration object. Rather will hold references internally and manipulate using node id and API
+    public <T extends RpcService> void addRpcImplementation(Class<T> serviceInterface,
+                                                            T implementation)
+            throws IllegalStateException {
+        bindingAwareContext.addRpcImplementation(serviceInterface, implementation);
+    }
+
+    // method does not return registration object. Rather will hold references internally and manipulate using node id and API
+    public <T extends RpcService> void addRpcImplementation(Node node,
+                                                            Class<T> serviceInterface,
+                                                            T implementation)
+            throws IllegalStateException {
+        BindingAwareBroker.RoutedRpcRegistration<T> registration
+                = addRoutedRpcImplementation(serviceInterface, implementation);
+
+        NodeRef nodeRef = createNodeRef(node.getId());
+        registration.registerPath(NodeContext.class, nodeRef.getValue());
+    }
+
+    public ListenerRegistration<NotificationListener> addNotificationListener(String nodeId,
+                                                                              QName notification,
+                                                                              NotificationListener listener) {
+        YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
+
+        NotificationService notificationService =
+                getBiSalService(DOMMountPointService.class)
+                        .getMountPoint(yii)
+                        .get()
+                        .getService(NotificationPublishService.class)
+                        .get();
+
+        ListenerRegistration<NotificationListener> registration =
+                notificationService.addNotificationListener(notification, listener);
+
+        LOGGER.info("Notification listener registered for {}, at node {}", notification, nodeId);
+
+        return registration;
+    }
+
+    public ListenerRegistration<NotificationListener> addNotificationListener(QName notification,
+                                                                              NotificationListener listener) {
+        NotificationService notificationService =
+                getBiSalService(NotificationPublishService.class);
+
+        ListenerRegistration<NotificationListener> registration =
+                notificationService.addNotificationListener(notification, listener);
+
+        LOGGER.info("Notification listener registered for {}.", notification);
+
+        return registration;
+    }
+
+    public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
+        return bindingAwareContext.getRpcService(serviceInterface);
+    }
+
+    public <T extends RpcService> T getRpcService(String nodeId, Class<T> serviceInterface) {
+        MountProviderService mountProviderService = getBaSalService(MountProviderService.class);
+
+        InstanceIdentifier<Node> key = InstanceIdentifier.create(Nodes.class)
+                                                         .child(Node.class,
+                                                                 new NodeKey(new NodeId(nodeId)));
+
+        MountInstance mountPoint = mountProviderService.getMountPoint(key);
+        return mountPoint.getRpcService(serviceInterface);
+    }
+
+    public void publishNotification(CompositeNode notification) {
+        getBiSalService(NotificationPublishService.class).publish(notification);
+    }
+
+    public SchemaContext getSchemaContext(String nodeId) {
+        YangInstanceIdentifier yii = inventoryNodeBIIdentifier(nodeId);
+
+        SchemaContext schemaContext =
+                getBiSalService(DOMMountPointService.class)
+                        .getMountPoint(yii)
+                        .get().getSchemaContext();
+
+        return schemaContext;
+    }
+
+    // ---------------------------
+    // ----- UTILITY METHODS -----
+    // ---------------------------
+    private <T extends BindingAwareService> T getBaSalService(Class<T> service) {
+        return bindingAwareContext.getSALService(service);
+    }
+
+    private <T extends BrokerService> T getBiSalService(Class<T> service) {
+        return bindingIndependentContext.getService(service);
+    }
+
+    private static final String NODE_ID_NAME = "id";
+
+    public static YangInstanceIdentifier inventoryNodeBIIdentifier(String nodeId) {
+        return YangInstanceIdentifier.builder()
+                .node(Nodes.QNAME)
+                .nodeWithKey(Node.QNAME,
+                             QName.create(Node.QNAME.getNamespace(),
+                                          Node.QNAME.getRevision(),
+                                          NODE_ID_NAME),
+                             nodeId)
+                .build();
+    }
+
+    private <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface,
+                                                                                                          T implementation)
+            throws IllegalStateException {
+        return bindingAwareContext.addRoutedRpcImplementation(serviceInterface, implementation);
+    }
+
+    public static NodeRef createNodeRef(NodeId nodeId) {
+        NodeKey nodeKey = new NodeKey(nodeId);
+        InstanceIdentifier<Node> path = InstanceIdentifier
+                .builder(Nodes.class)
+                .child(Node.class, nodeKey)
+                .build();
+        return new NodeRef(path);
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/mdsal/Providers.java
new file mode 100644 (file)
index 0000000..a28e588
--- /dev/null
@@ -0,0 +1,57 @@
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.mdsal;
+
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
+import org.opendaylight.controller.sal.core.api.AbstractProvider;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Providers {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Providers.class);
+
+    public static class BindingAware implements BindingAwareProvider, AutoCloseable {
+        private final InitializationContext initializationContext;
+
+        public BindingAware(InitializationContext ic) {
+            this.initializationContext = ic;
+        }
+
+        @Override
+        public void onSessionInitiated(BindingAwareBroker.ProviderContext session) {
+            initializationContext.set(session);
+
+            LOGGER.info("BindingAwareBroker.ProviderContext initialized");
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+    public static class BindingIndependent extends AbstractProvider implements AutoCloseable {
+        private final InitializationContext initializationContext;
+
+        public BindingIndependent(InitializationContext ic) {
+            this.initializationContext = ic;
+        }
+
+        @Override
+        public void onSessionInitiated(Broker.ProviderSession session) {
+            initializationContext.set(session);
+
+            LOGGER.info("Broker.ProviderSession initialized");
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventAggregator.java
new file mode 100644 (file)
index 0000000..4b77bf2
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: implement topic created notification
+public class EventAggregator implements EventAggregatorService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventAggregator.class);
+
+    private final MdSAL mdSAL;
+    private final EventSourceTopology eventSourceTopology;
+
+    public EventAggregator(final MdSAL mdSAL, final EventSourceTopology eventSourceTopology) {
+        this.mdSAL = mdSAL;
+        this.eventSourceTopology = eventSourceTopology;
+    }
+
+    public void mdsalReady() {
+        mdSAL.addRpcImplementation(EventAggregatorService.class, this);
+    }
+
+    @Override
+    public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+        LOGGER.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+                input.getNotificationPattern(),
+                input.getNodeIdPattern());
+
+        Topic topic = new Topic(new NotificationPattern(input.getNotificationPattern()), input.getNodeIdPattern().getValue(), mdSAL);
+
+        //# Make sure we capture all nodes from now on
+        eventSourceTopology.registerDataChangeListener(topic);
+
+        //# Notify existing nodes
+        //# Code reader note: Context of Node type is NetworkTopology
+        List<Node> nodes = eventSourceTopology.snapshot();
+        for (Node node : nodes) {
+            NodeId nodeIdToNotify = node.getAugmentation(Node1.class).getEventSourceNode();
+            topic.notifyNode(nodeIdToNotify);
+        }
+
+        CreateTopicOutput cto = new CreateTopicOutputBuilder()
+                .setTopicId(topic.getTopicId())
+                .build();
+
+        return Util.resultFor(cto);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
+        // 1. UNREGISTER DATA CHANGE LISTENER -> ?
+        // 2. CLOSE TOPIC
+        return null;
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceManager.java
new file mode 100644 (file)
index 0000000..a84eddd
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.mdsal.DataStore;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public final class EventSourceManager implements DataChangeListener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceManager.class);
+    private static final InstanceIdentifier<Node> INVENTORY_PATH = InstanceIdentifier.create(Nodes.class)
+                                                                                     .child(Node.class);
+    private final DataStore dataStore;
+    private final MdSAL mdSal;
+    private final EventSourceTopology eventSourceTopology;
+    private final Map<String, String> streamMap;
+
+    public EventSourceManager(DataStore dataStore,
+                              MdSAL mdSal,
+                              EventSourceTopology eventSourceTopology,
+                              List<NamespaceToStream> namespaceMapping) {
+        this.dataStore = dataStore;
+        this.mdSal = mdSal;
+        this.eventSourceTopology = eventSourceTopology;
+        this.streamMap = namespaceToStreamMapping(namespaceMapping);
+    }
+
+    private Map namespaceToStreamMapping(List<NamespaceToStream> namespaceMapping) {
+        Map<String, String> streamMap = new HashMap<>(namespaceMapping.size());
+
+        for (NamespaceToStream nToS  : namespaceMapping) {
+            streamMap.put(nToS.getUrnPrefix(), nToS.getStreamName());
+        }
+
+        return streamMap;
+    }
+
+    public void mdsalReady() {
+        dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+                                             INVENTORY_PATH,
+                                             this,
+                                             DataBroker.DataChangeScope.SUBTREE);
+
+        LOGGER.info("EventSourceManager initialized.");
+    }
+
+    @Override
+    public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+        LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+
+        Node node = Util.getAffectedNode(event);
+        // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
+        if ( node == null ) {
+            LOGGER.debug("OnDataChanged Event. Node is null.");
+            return;
+        }
+        if ( isNetconfNode(node) == false ) {
+            LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
+            return;
+        }
+        if ( isEventSource(node) == false ) {
+            LOGGER.debug("OnDataChanged Event. Node an EventSource node.");
+            return;
+        }
+
+        NetconfEventSource netconfEventSource = new NetconfEventSource(mdSal,
+                                                                       node.getKey().getId().getValue(),
+                                                                       streamMap);
+        mdSal.addRpcImplementation(node, EventSourceService.class, netconfEventSource);
+
+        InstanceIdentifier<NetconfNode> nodeInstanceIdentifier =
+                InstanceIdentifier.create(Nodes.class)
+                        .child(Node.class, node.getKey())
+                        .augmentation(NetconfNode.class);
+
+        dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL,
+                nodeInstanceIdentifier,
+                netconfEventSource,
+                DataBroker.DataChangeScope.SUBTREE);
+
+        eventSourceTopology.insert(node);
+    }
+
+    private boolean isNetconfNode(Node node)  {
+        return node.getAugmentation(NetconfNode.class) != null ;
+    }
+
+    public boolean isEventSource(Node node) {
+        NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+
+        return isEventSource(netconfNode);
+    }
+
+    private boolean isEventSource(NetconfNode node) {
+        for (String capability : node.getInitialCapability()) {
+            if(capability.startsWith("urn:ietf:params:xml:ns:netconf:notification")) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
new file mode 100644 (file)
index 0000000..c070097
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.mdsal.DataStore;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.TopologyTypes1Builder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSource;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.topology.event.source.type.TopologyEventSourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class EventSourceTopology {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventSourceTopology.class);
+
+    private static final String topologyId = "EVENT-SOURCE-TOPOLOGY" ;
+    private static final TopologyKey topologyKey = new TopologyKey(new TopologyId(topologyId));
+    private static final LogicalDatastoreType datastoreType = LogicalDatastoreType.OPERATIONAL;
+
+    private static final InstanceIdentifier<Topology> topologyInstanceIdentifier =
+            InstanceIdentifier.create(NetworkTopology.class)
+                    .child(Topology.class, topologyKey);
+
+    private static final InstanceIdentifier<TopologyTypes1> topologyTypeInstanceIdentifier =
+            topologyInstanceIdentifier
+                    .child(TopologyTypes.class)
+                    .augmentation(TopologyTypes1.class);
+
+    private static final InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                                            .network.topology.rev131021.network.topology.topology.Node> eventSourceTopologyPath =
+            InstanceIdentifier.create(NetworkTopology.class)
+                    .child(Topology.class)
+                    .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                            .network.topology.rev131021.network.topology.topology.Node.class);
+
+    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+            new ConcurrentHashMap<>();
+
+    private final DataStore dataStore;
+
+    public EventSourceTopology(DataStore dataStore) {
+        this.dataStore = dataStore;
+    }
+
+    public void mdsalReady() {
+        TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
+        TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+
+        dataStore.asyncPUT(datastoreType, topologyTypeInstanceIdentifier, topologyTypeAugment);
+    }
+
+    public void insert(Node node) {
+        String nodeId = node.getKey().getId().getValue();
+        NodeKey nodeKey = new NodeKey(new NodeId(nodeId));
+        InstanceIdentifier<Node1> topologyNodeAugment
+                = topologyInstanceIdentifier
+                .child(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                        .network.topology.rev131021.network.topology.topology.Node.class, nodeKey)
+                .augmentation(Node1.class);
+
+        Node1 nodeAgument = new Node1Builder().setEventSourceNode(node.getId()).build();
+        dataStore.asyncPUT(datastoreType, topologyNodeAugment, nodeAgument);
+    }
+
+    // TODO: Should we expose this functioanlity over RPC?
+    public List<org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang
+                .network.topology.rev131021.network.topology.topology.Node> snapshot() {
+        Topology topology = dataStore.read(datastoreType, topologyInstanceIdentifier);
+        return topology.getNode();
+    }
+
+    public void registerDataChangeListener(DataChangeListener listener) {
+        ListenerRegistration<DataChangeListener> listenerRegistration = dataStore.registerDataChangeListener(datastoreType,
+                eventSourceTopologyPath,
+                listener,
+                DataBroker.DataChangeScope.SUBTREE);
+
+        registrations.put(listener, listenerRegistration);
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java
new file mode 100644 (file)
index 0000000..9c0697f
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfEventSource implements EventSourceService, NotificationListener, DataChangeListener {
+    private static final Logger LOGGER = LoggerFactory.getLogger(NetconfEventSource.class);
+
+    private final MdSAL mdSal;
+    private final String nodeId;
+
+    private final List<String> activeStreams = new ArrayList<>();
+
+    private final Map<String, String> urnPrefixToStreamMap;
+
+    public NetconfEventSource(final MdSAL mdSal, final String nodeId, final Map<String, String> streamMap) {
+        Preconditions.checkNotNull(mdSal);
+        Preconditions.checkNotNull(nodeId);
+
+        this.mdSal = mdSal;
+        this.nodeId = nodeId;
+        this.urnPrefixToStreamMap = streamMap;
+
+        LOGGER.info("NetconfEventSource [{}] created.", nodeId);
+    }
+
+    @Override
+    public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
+        final NotificationPattern notificationPattern = input.getNotificationPattern();
+
+        // FIXME: default language should already be regex
+        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+        final Pattern pattern = Pattern.compile(regex);
+        List<QName> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
+        registerNotificationListener(matchingNotifications);
+        return null;
+    }
+
+    private List<QName> availableNotifications() {
+        // FIXME: use SchemaContextListener to get changes asynchronously
+        Set<NotificationDefinition> availableNotifications = mdSal.getSchemaContext(nodeId).getNotifications();
+        List<QName> qNs = new ArrayList<>(availableNotifications.size());
+        for (NotificationDefinition nd : availableNotifications) {
+            qNs.add(nd.getQName());
+        }
+
+        return qNs;
+    }
+
+    private void registerNotificationListener(final List<QName> notificationsToSubscribe) {
+        for (QName qName : notificationsToSubscribe) {
+            startSubscription(qName);
+            // FIXME: do not lose this registration
+            final ListenerRegistration<NotificationListener> reg = mdSal.addNotificationListener(nodeId, qName, this);
+        }
+    }
+
+    private synchronized void startSubscription(final QName qName) {
+        String streamName = resolveStream(qName);
+
+        if (streamIsActive(streamName) == false) {
+            LOGGER.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+            startSubscription(streamName);
+        }
+    }
+
+    private synchronized void resubscribeToActiveStreams() {
+        for (String streamName : activeStreams) {
+            startSubscription(streamName);
+        }
+    }
+
+    private synchronized void startSubscription(final String streamName) {
+        CreateSubscriptionInput subscriptionInput = getSubscriptionInput(streamName);
+        mdSal.getRpcService(nodeId, NotificationsService.class).createSubscription(subscriptionInput);
+        activeStreams.add(streamName);
+    }
+
+    private static CreateSubscriptionInput getSubscriptionInput(final String streamName) {
+        CreateSubscriptionInputBuilder csib = new CreateSubscriptionInputBuilder();
+        csib.setStream(new StreamNameType(streamName));
+        return csib.build();
+    }
+
+    private String resolveStream(final QName qName) {
+        String streamName = null;
+
+        for (Map.Entry<String, String> entry : urnPrefixToStreamMap.entrySet()) {
+            String nameSpace = qName.getNamespace().toString();
+            String urnPrefix = entry.getKey();
+            if( nameSpace.startsWith(urnPrefix) ) {
+                streamName = entry.getValue();
+                break;
+            }
+        }
+
+        return streamName;
+    }
+
+    private boolean streamIsActive(final String streamName) {
+        return activeStreams.contains(streamName);
+    }
+
+    // PASS
+    @Override public Set<QName> getSupportedNotifications() {
+        return null;
+    }
+
+    @Override
+    public void onNotification(final CompositeNode notification) {
+        LOGGER.info("NetconfEventSource {} received notification {}. Will publish to MD-SAL.", nodeId, notification);
+        ImmutableCompositeNode payload = ImmutableCompositeNode.builder()
+                .setQName(QName.create(TopicNotification.QNAME, "payload"))
+                .add(notification).toInstance();
+        ImmutableCompositeNode icn = ImmutableCompositeNode.builder()
+                .setQName(TopicNotification.QNAME)
+                .add(payload)
+                .addLeaf("event-source", nodeId)
+                .toInstance();
+
+        mdSal.publishNotification(icn);
+    }
+
+    @Override
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
+        boolean wasConnected = false;
+        boolean nowConnected = false;
+
+        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getOriginalData().entrySet()) {
+            if ( isNetconfNode(changeEntry) ) {
+                NetconfNode nn = (NetconfNode)changeEntry.getValue();
+                wasConnected = nn.isConnected();
+            }
+        }
+
+        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : change.getUpdatedData().entrySet()) {
+            if ( isNetconfNode(changeEntry) ) {
+                NetconfNode nn = (NetconfNode)changeEntry.getValue();
+                nowConnected = nn.isConnected();
+            }
+        }
+
+        if (wasConnected == false && nowConnected == true) {
+            resubscribeToActiveStreams();
+        }
+    }
+
+    private static boolean isNetconfNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
+        return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
+    }
+
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Topic.java
new file mode 100644 (file)
index 0000000..aebde0c
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.mdsal.MdSAL;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.LoggerFactory;
+
+public class Topic implements DataChangeListener {
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Topic.class);
+    private final NotificationPattern notificationPattern;
+    private final Pattern nodeIdPattern;
+    private final TopicId topicId;
+    private final MdSAL mdSal;
+
+    public Topic(final NotificationPattern notificationPattern, final String nodeIdPattern, final MdSAL mdSal) {
+        this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
+
+        // FIXME: regex should be the language of nodeIdPattern
+        final String regex = Util.wildcardToRegex(nodeIdPattern);
+        this.nodeIdPattern = Pattern.compile(regex);
+        this.mdSal = Preconditions.checkNotNull(mdSal);
+
+        // FIXME: We need to perform some salting in order to make
+        //        the topic IDs less predictable.
+        this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern));
+    }
+
+    public TopicId getTopicId() {
+        return topicId;
+    }
+
+    @Override
+    public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        // TODO: affected must return topologyNode !!!
+        final Node node = Util.getAffectedNode(event);
+        if (nodeIdPattern.matcher(node.getId().getValue()).matches()) {
+            notifyNode(node.getId());
+        } else {
+            LOG.debug("Skipping node {}", node.getId());
+        }
+    }
+
+    public void notifyNode(final NodeId nodeId) {
+        JoinTopicInput jti = getJoinTopicInputArgument(nodeId);
+        EventSourceService ess = mdSal.getRpcService(EventSourceService.class);
+
+        if (ess == null) {
+            throw new IllegalStateException("EventSourceService is not registered.");
+        }
+
+        ess.joinTopic(jti);
+    }
+
+    private JoinTopicInput getJoinTopicInputArgument(final NodeId nodeId) {
+        NodeRef nodeRef = MdSAL.createNodeRef(nodeId);
+        JoinTopicInput jti =
+                new JoinTopicInputBuilder()
+                        .setNode(nodeRef.getValue())
+                        .setTopicId(topicId)
+                        .setNotificationPattern(notificationPattern)
+                        .build();
+        return jti;
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java
new file mode 100644 (file)
index 0000000..9927d85
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.messagebus.app.impl;
+
+import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public final class Util {
+    private static final MessageDigest messageDigestTemplate = getDigestInstance();
+
+    private static MessageDigest getDigestInstance() {
+        try {
+            return MessageDigest.getInstance("MD5");
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("Unable to get MD5 instance");
+        }
+    }
+
+    public static String md5String(final String inputString) {
+
+        try {
+            MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
+            md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
+            return new BigInteger(1, md.digest()).toString(16);
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to get MD5 instance");
+        }
+    }
+
+    public static <T> Future<RpcResult<T>> resultFor(final T output) {
+        RpcResult<T> result = Rpcs.getRpcResult(true, output, Collections.<RpcError>emptyList());
+        return Futures.immediateFuture(result);
+    }
+
+    /**
+     * Extracts affected node from data change event.
+     * @param event
+     * @return
+     */
+    public static Node getAffectedNode(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        // TODO: expect listener method to be called even when change impact node
+        // TODO: test with change.getCreatedData()
+        for (Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+            if (isNode(changeEntry)) {
+                return (Node) changeEntry.getValue();
+            }
+        }
+
+        return null;
+    }
+
+    private static boolean isNode(final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry )  {
+        return Node.class.equals(changeEntry.getKey().getTargetType());
+    }
+
+    /**
+     * Method filters qnames based on wildcard strings
+     *
+     * @param availableQnames
+     * @param patterh matching pattern
+     * @return list of filtered qnames
+     */
+    public static List<QName> expandQname(final List<QName> availableQnames, final Pattern pattern) {
+        List<QName> matchingQnames = new ArrayList<>();
+
+        for (QName qname : availableQnames) {
+            String namespace = qname.getNamespace().toString();
+            if (pattern.matcher(namespace).matches()) {
+                matchingQnames.add(qname);
+            }
+        }
+
+        return matchingQnames;
+    }
+
+    /**
+     * CREDIT to http://www.rgagnon.com/javadetails/java-0515.html
+     * @param wildcard
+     * @return
+     */
+    static String wildcardToRegex(final String wildcard){
+        StringBuffer s = new StringBuffer(wildcard.length());
+        s.append('^');
+        for (char c : wildcard.toCharArray()) {
+            switch(c) {
+                case '*':
+                    s.append(".*");
+                    break;
+                case '?':
+                    s.append('.');
+                    break;
+                // escape special regexp-characters
+                case '(':
+                case ')':
+                case '[':
+                case ']':
+                case '$':
+                case '^':
+                case '.':
+                case '{':
+                case '}':
+                case '|':
+                case '\\':
+                    s.append("\\");
+                    s.append(c);
+                    break;
+                default:
+                    s.append(c);
+                    break;
+            }
+        }
+        s.append('$');
+        return s.toString();
+    }
+}
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang b/opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang
new file mode 100644 (file)
index 0000000..bed6b10
--- /dev/null
@@ -0,0 +1,64 @@
+module messagebus-app-impl {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl";
+    prefix "binding-impl";
+
+    import config { prefix config; revision-date 2013-04-05; }
+    import opendaylight-md-sal-binding {prefix sal;}
+    import opendaylight-md-sal-dom {prefix dom;}
+
+
+    description
+        "Service definition for Message Bus application implementation.";
+    revision "2015-02-03" {
+        description "Second revision. Message Bus opensourcing";
+    }
+
+    identity messagebus-app-impl {
+        base config:module-type;
+        config:java-name-prefix MessageBusAppImpl;
+    }
+    
+    augment "/config:modules/config:module/config:configuration" {
+        case messagebus-app-impl {
+            when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
+            
+            container binding-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity sal:binding-broker-osgi-registry;
+                    }
+                }
+            }
+
+            container dom-broker {
+                uses config:service-ref {
+                    refine type {
+                        mandatory true;
+                        config:required-identity dom:dom-broker-osgi-registry;
+                    }
+                }
+            }
+
+            list namespace-to-stream {
+                key urn-prefix;
+
+                leaf urn-prefix {
+                    type string;
+                }
+
+                leaf stream-name {
+                    type string;
+                }
+            }
+        }
+    }
+    
+    augment "/config:modules/config:module/config:state" {
+        case messagebus-app-impl {
+            when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
+        }
+    }
+}
\ No newline at end of file
index 1743b62..bdeb8a6 100644 (file)
 
     <!-- Clustering -->
     <module>sal-remoterpc-connector</module>
+
+    <!-- Message Bus -->
+    <module>messagebus-api</module>
+    <module>messagebus-impl</module>
   </modules>
 
   <build>
index d1c3fef..e2aa169 100644 (file)
@@ -41,7 +41,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     protected int adjustedIndex(long logEntryIndex) {
-        if(snapshotIndex < 0){
+        if (snapshotIndex < 0) {
             return (int) logEntryIndex;
         }
         return (int) (logEntryIndex - (snapshotIndex + 1));
@@ -134,6 +134,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
        return journal.size();
     }
 
+    @Override
+    public int dataSize() {
+        return dataSize;
+    }
+
     @Override
     public boolean isPresent(long logEntryIndex) {
         if (logEntryIndex > lastIndex()) {
@@ -200,6 +205,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
         dataSize = 0;
+        // need to recalc the datasize based on the entries left after precommit.
+        for(ReplicatedLogEntry logEntry : journal) {
+            dataSize += logEntry.size();
+        }
+
     }
 
     @Override
index 78a1335..fd49737 100644 (file)
@@ -75,7 +75,7 @@ public interface ConfigParams {
      * The interval in which the leader needs to check itself if its isolated
      * @return FiniteDuration
      */
-    FiniteDuration getIsolatedCheckInterval();
+    long getIsolatedCheckIntervalInMillis();
 
 
     /**
index 86867e1..3e6742c 100644 (file)
@@ -42,8 +42,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration heartBeatInterval = HEART_BEAT_INTERVAL;
     private long snapshotBatchCount = SNAPSHOT_BATCH_COUNT;
     private int journalRecoveryLogBatchSize = JOURNAL_RECOVERY_LOG_BATCH_SIZE;
-    private FiniteDuration isolatedLeaderCheckInterval =
-        new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+    private long isolatedLeaderCheckInterval = HEART_BEAT_INTERVAL.$times(1000).toMillis();
 
     // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
     // in-memory journal can use before it needs to snapshot
@@ -68,7 +67,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     }
 
     public void setIsolatedLeaderCheckInterval(FiniteDuration isolatedLeaderCheckInterval) {
-        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval;
+        this.isolatedLeaderCheckInterval = isolatedLeaderCheckInterval.toMillis();
     }
 
     public void setElectionTimeoutFactor(long electionTimeoutFactor){
@@ -112,7 +111,7 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     }
 
     @Override
-    public FiniteDuration getIsolatedCheckInterval() {
+    public long getIsolatedCheckIntervalInMillis() {
         return isolatedLeaderCheckInterval;
     }
 
index 935c4f0..04b9f16 100644 (file)
@@ -100,4 +100,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     public long timeSinceLastActivity() {
         return stopwatch.elapsed(TimeUnit.MILLISECONDS);
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
+                .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
+                .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
+                .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
+        return builder.toString();
+    }
+
+
 }
index 1f446c7..3ec8cc5 100644 (file)
@@ -22,6 +22,7 @@ import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
@@ -31,16 +32,13 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,6 +83,9 @@ import org.slf4j.LoggerFactory;
  * </ul>
  */
 public abstract class RaftActor extends AbstractUntypedPersistentActor {
+
+    private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
+
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
@@ -281,6 +282,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
+            long elapsedTime = (System.nanoTime() - applyState.getStartTime());
+            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+                LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+            }
+
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: Applying state for log index {} data {}",
                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
@@ -358,13 +365,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
 
         } else {
-            if (!(message instanceof AppendEntriesMessages.AppendEntries)
-                && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
-                }
-            }
-
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
@@ -573,7 +573,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     /**
      * This method is called during recovery to reconstruct the state of the actor.
      *
-     * @param snapshot A snapshot of the state of the actor
+     * @param snapshotBytes A snapshot of the state of the actor
      */
     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
 
@@ -668,12 +668,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
-        //be greedy and remove entries from in-mem journal which are in the snapshot
-        // and update snapshotIndex and snapshotTerm without waiting for the success,
+        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+        if (context.getReplicatedLog().dataSize() > dataThreshold) {
+            // if memory is less, clear the log based on lastApplied.
+            // this could/should only happen if one of the followers is down
+            // as normally we keep removing from the log when its replicated to all.
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+                    captureSnapshot.getLastAppliedTerm());
 
-        context.getReplicatedLog().snapshotPreCommit(
-            captureSnapshot.getLastAppliedIndex(),
-            captureSnapshot.getLastAppliedTerm());
+        } else {
+            // clear the log based on replicatedToAllIndex
+            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+                    captureSnapshot.getReplicatedToAllTerm());
+        }
+        getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
 
         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
@@ -717,13 +726,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // FIXME: Maybe this should be done after the command is saved
             journal.subList(adjustedIndex , journal.size()).clear();
 
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
 
-                @Override public void apply(DeleteEntries param)
-                    throws Exception {
+                @Override
+                public void apply(DeleteEntries param)
+                        throws Exception {
                     //FIXME : Doing nothing for now
                     dataSize = 0;
-                    for(ReplicatedLogEntry entry : journal){
+                    for (ReplicatedLogEntry entry : journal) {
                         dataSize += entry.size();
                     }
                 }
@@ -735,11 +745,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(replicatedLogEntry, null);
         }
 
-        @Override
-        public int dataSize() {
-            return dataSize;
-        }
-
         public void appendAndPersist(
             final ReplicatedLogEntry replicatedLogEntry,
             final Procedure<ReplicatedLogEntry> callback)  {
@@ -766,7 +771,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex()+1;
+                        long journalSize = lastIndex() + 1;
 
                         if(!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
@@ -793,7 +798,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
                             dataSizeSinceLastSnapshot = 0;
 
-                            LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
+                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
+                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
 
@@ -817,12 +824,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             }
 
                             // send a CaptureSnapshot to self to make the expensive operation async.
-                            getSelf().tell(new CaptureSnapshot(
-                                lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
                                 null);
                             context.setSnapshotCaptureInitiated(true);
                         }
-                        if(callback != null){
+                        if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
                     }
index 80b7ad9..82d0839 100644 (file)
@@ -145,14 +145,14 @@ public interface ReplicatedLog {
      * sets snapshot term
      * @param snapshotTerm
      */
-    public void setSnapshotTerm(long snapshotTerm);
+    void setSnapshotTerm(long snapshotTerm);
 
     /**
      * Clears the journal entries with startIndex(inclusive) and endIndex (exclusive)
      * @param startIndex
      * @param endIndex
      */
-    public void clear(int startIndex, int endIndex);
+    void clear(int startIndex, int endIndex);
 
     /**
      * Handles all the bookkeeping in order to perform a rollback in the
@@ -160,20 +160,21 @@ public interface ReplicatedLog {
      * @param snapshotCapturedIndex
      * @param snapshotCapturedTerm
      */
-    public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
+    void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm);
 
     /**
      * Sets the Replicated log to state after snapshot success.
      */
-    public void snapshotCommit();
+    void snapshotCommit();
 
     /**
      * Restores the replicated log to a state in the event of a save snapshot failure
      */
-    public void snapshotRollback();
+    void snapshotRollback();
 
     /**
      * Size of the data in the log (in bytes)
      */
-    public int dataSize();
+    int dataSize();
+
 }
index 0a7a632..9299e75 100644 (file)
@@ -9,21 +9,22 @@
 package org.opendaylight.controller.cluster.raft.base.messages;
 
 import akka.actor.ActorRef;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 
 public class ApplyState implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActorRef clientActor;
     private final String identifier;
     private final ReplicatedLogEntry replicatedLogEntry;
+    private final long startTime;
 
     public ApplyState(ActorRef clientActor, String identifier,
         ReplicatedLogEntry replicatedLogEntry) {
         this.clientActor = clientActor;
         this.identifier = identifier;
         this.replicatedLogEntry = replicatedLogEntry;
+        this.startTime = System.nanoTime();
     }
 
     public ActorRef getClientActor() {
@@ -37,4 +38,17 @@ public class ApplyState implements Serializable {
     public ReplicatedLogEntry getReplicatedLogEntry() {
         return replicatedLogEntry;
     }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    @Override
+    public String toString() {
+        return "ApplyState{" +
+                "identifier='" + identifier + '\'' +
+                ", replicatedLogEntry.index =" + replicatedLogEntry.getIndex() +
+                ", startTime=" + startTime +
+                '}';
+    }
 }
index d4dd335..a96b1e4 100644 (file)
@@ -14,19 +14,23 @@ public class CaptureSnapshot {
     private long lastIndex;
     private long lastTerm;
     private boolean installSnapshotInitiated;
+    private long replicatedToAllIndex;
+    private long replicatedToAllTerm;
 
     public CaptureSnapshot(long lastIndex, long lastTerm,
-        long lastAppliedIndex, long lastAppliedTerm) {
-        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, false);
+        long lastAppliedIndex, long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm) {
+        this(lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, replicatedToAllIndex , replicatedToAllTerm, false);
     }
 
     public CaptureSnapshot(long lastIndex, long lastTerm,long lastAppliedIndex,
-        long lastAppliedTerm, boolean installSnapshotInitiated) {
+        long lastAppliedTerm, long replicatedToAllIndex, long replicatedToAllTerm, boolean installSnapshotInitiated) {
         this.lastIndex = lastIndex;
         this.lastTerm = lastTerm;
         this.lastAppliedIndex = lastAppliedIndex;
         this.lastAppliedTerm = lastAppliedTerm;
         this.installSnapshotInitiated = installSnapshotInitiated;
+        this.replicatedToAllIndex = replicatedToAllIndex;
+        this.replicatedToAllTerm = replicatedToAllTerm;
     }
 
     public long getLastAppliedIndex() {
@@ -48,4 +52,12 @@ public class CaptureSnapshot {
     public boolean isInstallSnapshotInitiated() {
         return installSnapshotInitiated;
     }
+
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
+    }
+
+    public long getReplicatedToAllTerm() {
+        return replicatedToAllTerm;
+    }
 }
index 9e4f3b4..be51ba0 100644 (file)
@@ -91,10 +91,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
-    private long replicatedToAllIndex = -1;
-
     public AbstractLeader(RaftActorContext context) {
-        super(context);
+        super(context, RaftState.Leader);
 
         final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
         for (String followerId : context.getPeerAddresses().keySet()) {
@@ -109,7 +107,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         leaderId = context.getId();
 
-        LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds());
+        LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds());
 
         minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
 
@@ -127,7 +125,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // Upon election: send initial empty AppendEntries RPCs
         // (heartbeat) to each server; repeat during idle periods to
         // prevent election timeouts (§5.2)
-        sendAppendEntries(0);
+        sendAppendEntries(0, false);
+
+        // It is important to schedule this heartbeat here
+        scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
     }
 
     /**
@@ -139,10 +140,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.keySet();
     }
 
-    private Optional<ByteString> getSnapshot() {
-        return snapshot;
-    }
-
     @VisibleForTesting
     void setSnapshot(Optional<ByteString> snapshot) {
         this.snapshot = snapshot;
@@ -152,9 +149,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
-        }
+        LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
 
         return this;
     }
@@ -163,10 +158,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
-        if(! appendEntriesReply.isSuccess()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply);
-            }
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
+        } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) {
+            LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
         }
 
         // Update the FollowerLogInformation
@@ -175,10 +170,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             followerToLog.get(followerId);
 
         if(followerLogInformation == null){
-            LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId);
+            LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
             return this;
         }
 
+        if(followerLogInformation.timeSinceLastActivity() >
+                context.getConfigParams().getElectionTimeOutInterval().toMillis()) {
+            LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " +
+                            "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}",
+                    logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(),
+                    context.getLastApplied(), context.getCommitIndex());
+        }
+
         followerLogInformation.markFollowerActive();
 
         if (appendEntriesReply.isSuccess()) {
@@ -223,6 +226,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
+            LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}",
+                    logName(), context.getCommitIndex(), context.getLastApplied());
+
             applyLogToStateMachine(context.getCommitIndex());
         }
 
@@ -231,20 +237,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
 
         //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
-        sendUpdatesToFollower(followerId, followerLogInformation, false);
+        sendUpdatesToFollower(followerId, followerLogInformation, false, false);
         return this;
     }
 
     private void purgeInMemoryLog() {
-        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        //find the lowest index across followers which has been replicated to all.
+        // lastApplied if there are no followers, so that we keep clearing the log for single-node
         // we would delete the in-mem log from that index on, in-order to minimize mem usage
         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
-        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
         for (FollowerLogInformation info : followerToLog.values()) {
             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
         }
 
-        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+        super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
     @Override
@@ -277,10 +284,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return this;
     }
 
-    @Override
-    public RaftState state() {
-        return RaftState.Leader;
-    }
+    protected void beforeSendHeartbeat(){}
 
     @Override
     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
@@ -294,8 +298,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
-                        rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
@@ -303,37 +307,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
         }
 
-        try {
-            if (message instanceof SendHeartBeat) {
-                sendHeartBeat();
-                return this;
+        if (message instanceof SendHeartBeat) {
+            beforeSendHeartbeat();
+            sendHeartBeat();
+            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
+            return this;
 
-            } else if(message instanceof SendInstallSnapshot) {
-                // received from RaftActor
-                setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
-                sendInstallSnapshot();
+        } else if(message instanceof SendInstallSnapshot) {
+            // received from RaftActor
+            setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+            sendInstallSnapshot();
 
-            } else if (message instanceof Replicate) {
-                replicate((Replicate) message);
+        } else if (message instanceof Replicate) {
+            replicate((Replicate) message);
 
-            } else if (message instanceof InstallSnapshotReply){
-                handleInstallSnapshotReply((InstallSnapshotReply) message);
+        } else if (message instanceof InstallSnapshotReply){
+            handleInstallSnapshotReply((InstallSnapshotReply) message);
 
-            }
-        } finally {
-            scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         }
 
+
         return super.handleMessage(sender, message);
     }
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+        LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
+
         String followerId = reply.getFollowerId();
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
         if (followerToSnapshot == null) {
             LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
-                    context.getId(), followerId);
+                    logName(), followerId);
             return;
         }
 
@@ -347,8 +352,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     //this was the last chunk reply
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: InstallSnapshotReply received, " +
-                                "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}",
-                                context.getId(), reply.getChunkIndex(), followerId,
+                                "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}",
+                                logName(), reply.getChunkIndex(), followerId,
                             context.getReplicatedLog().getSnapshotIndex() + 1
                         );
                     }
@@ -359,10 +364,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotIndex() + 1);
                     mapFollowerToSnapshot.remove(followerId);
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" +
-                                context.getId(), followerToLog.get(followerId).getNextIndex());
-                    }
+                    LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
+                                logName(), followerId, followerLogInformation.getMatchIndex(),
+                                followerLogInformation.getNextIndex());
 
                     if (mapFollowerToSnapshot.isEmpty()) {
                         // once there are no pending followers receiving snapshots
@@ -376,7 +380,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 }
             } else {
                 LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
-                        context.getId(), reply.getChunkIndex());
+                        logName(), reply.getChunkIndex());
 
                 followerToSnapshot.markSendStatus(false);
             }
@@ -390,7 +394,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         } else {
             LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
-                    context.getId(), reply.getChunkIndex(), followerId,
+                    logName(), reply.getChunkIndex(), followerId,
                     followerToSnapshot.getChunkIndex());
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
@@ -404,9 +408,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Replicate message {}", context.getId(), logIndex);
-        }
+        LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(),
+                replicate.getIdentifier(), logIndex);
 
         // Create a tracker entry we will use this later to notify the
         // client actor
@@ -420,11 +423,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             context.setCommitIndex(logIndex);
             applyLogToStateMachine(logIndex);
         } else {
-            sendAppendEntries(0);
+            sendAppendEntries(0, false);
         }
     }
 
-    private void sendAppendEntries(long timeSinceLastActivityInterval) {
+    private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
         // Send an AppendEntries to all followers
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
@@ -432,7 +435,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // This checks helps not to send a repeat message to the follower
             if(!followerLogInformation.isFollowerActive() ||
                     followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) {
-                sendUpdatesToFollower(followerId, followerLogInformation, true);
+                sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat);
             }
         }
     }
@@ -445,7 +448,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
 
     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
-                                          boolean sendHeartbeat) {
+                                       boolean sendHeartbeat, boolean isHeartbeat) {
 
         ActorSelection followerActor = context.getPeerActorSelection(followerId);
         if (followerActor != null) {
@@ -464,25 +467,33 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             } else {
                 long leaderLastIndex = context.getReplicatedLog().lastIndex();
                 long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-                if (isFollowerActive &&
-                    context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+                if(!isHeartbeat || LOG.isTraceEnabled()) {
+                    LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+                            logName(), followerId, leaderLastIndex, leaderSnapShotIndex);
+                }
+
+                if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+
+                    LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(),
+                            followerNextIndex, followerId);
+
                     // FIXME : Sending one entry at a time
                     final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
                     sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
 
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex >= followerNextIndex) {
+                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
 
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("InitiateInstallSnapshot to follower:{}," +
-                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
-                                "leader-last-index:{}", followerId,
-                            followerNextIndex, leaderSnapShotIndex, leaderLastIndex
-                        );
+                        LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+                                    "follower-nextIndex: %d, leader-snapshot-index: %d,  " +
+                                    "leader-last-index: %d", logName(), followerId,
+                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
                     }
 
                     // Send heartbeat to follower whenever install snapshot is initiated.
@@ -507,10 +518,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), replicatedToAllIndex);
+            context.getCommitIndex(), super.getReplicatedToAllIndex());
 
-        if(!entries.isEmpty()) {
-            LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+        if(!entries.isEmpty() || LOG.isTraceEnabled()) {
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
                     appendEntries);
         }
 
@@ -518,7 +529,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
-     * /**
      * Install Snapshot works as follows
      * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
      * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
@@ -533,10 +543,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * @param followerNextIndex
      */
     private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
-        }
-
         if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
                 context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
 
@@ -549,7 +555,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             } else if (!context.isSnapshotCaptureInitiated()) {
 
-                LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId());
+                LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId());
                 ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied());
                 long lastAppliedIndex = -1;
                 long lastAppliedTerm = -1;
@@ -557,15 +563,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if (lastAppliedEntry != null) {
                     lastAppliedIndex = lastAppliedEntry.getIndex();
                     lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
                     lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
                     lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
                 }
 
                 boolean isInstallSnapshotInitiated = true;
-                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-                        actor());
+                long replicatedToAllIndex = super.getReplicatedToAllIndex();
+                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+                    isInstallSnapshotInitiated), actor());
                 context.setSnapshotCaptureInitiated(true);
             }
         }
@@ -573,6 +582,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
 
     private void sendInstallSnapshot() {
+        LOG.debug("{}: sendInstallSnapshot", logName());
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
@@ -605,19 +615,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         nextSnapshotChunk,
-                            followerToSnapshot.incrementChunkIndex(),
-                            followerToSnapshot.getTotalChunks(),
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
                         Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                        context.getId(), followerActor.path(),
+                        logName(), followerActor.path(),
                         followerToSnapshot.getChunkIndex(),
                         followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
-            LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
+            LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
         }
     }
 
@@ -632,15 +642,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
         ByteString nextChunk = followerToSnapshot.getNextChunk();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size());
-        }
+
+        LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size());
+
         return nextChunk;
     }
 
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
-            sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis());
+            LOG.trace("{}: Sending heartbeat", logName());
+            sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true);
         }
     }
 
@@ -715,7 +726,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
-                        context.getId(), size, totalChunks);
+                        logName(), size, totalChunks);
             }
             replyReceivedForOffset = -1;
             chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
@@ -783,10 +794,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 }
             }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(),
+
+            LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
                     snapshotLength, start, size);
-            }
+
             ByteString substring = getSnapshotBytes().substring(start, start + size);
             nextChunkHashCode = substring.hashCode();
             return substring;
index 075b287..0b0b4c7 100644 (file)
@@ -14,6 +14,7 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -58,10 +59,37 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected String leaderId = null;
 
+    private long replicatedToAllIndex = -1;
 
-    protected AbstractRaftActorBehavior(RaftActorContext context) {
+    private final String logName;
+
+    private final RaftState state;
+
+    protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) {
         this.context = context;
+        this.state = state;
         this.LOG = context.getLogger();
+
+        logName = String.format("%s (%s)", context.getId(), state);
+    }
+
+    @Override
+    public RaftState state() {
+        return state;
+    }
+
+    public String logName() {
+        return logName;
+    }
+
+    @Override
+    public void setReplicatedToAllIndex(long replicatedToAllIndex) {
+        this.replicatedToAllIndex = replicatedToAllIndex;
+    }
+
+    @Override
+    public long getReplicatedToAllIndex() {
+        return replicatedToAllIndex;
     }
 
     /**
@@ -95,7 +123,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         if (appendEntries.getTerm() < currentTerm()) {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
-                        context.getId(), appendEntries.getTerm(), currentTerm());
+                        logName(), appendEntries.getTerm(), currentTerm());
             }
 
             sender.tell(
@@ -132,12 +160,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param requestVote
      * @return
      */
-    protected RaftActorBehavior requestVote(ActorRef sender,
-        RequestVote requestVote) {
+    protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received {}", context.getId(), requestVote);
-        }
+        LOG.debug("{}: In requestVote:  {}", logName(), requestVote);
 
         boolean grantVote = false;
 
@@ -173,7 +198,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             }
         }
 
-        sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
+        RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
+
+        LOG.debug("{}: requestVote returning: {}", logName(), reply);
+
+        sender.tell(reply, actor());
 
         return this;
     }
@@ -351,12 +380,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
                 // around as the rest wont be present either
                 LOG.warn(
                         "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
-                        context.getId(), i, i, index);
+                        logName(), i, i, index);
                 break;
             }
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Setting last applied to {}", context.getId(), newLastApplied);
+            LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
         }
         context.setLastApplied(newLastApplied);
 
@@ -390,11 +419,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
-        LOG.info("{} :- Switching from behavior {} to {}", context.getId(), this.state(), behavior.state());
+        LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
         try {
             close();
         } catch (Exception e) {
-            LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e);
+            LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
         }
 
         return behavior;
@@ -423,17 +452,28 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     }
 
-    protected long fakeSnapshot(final long minReplicatedToAllIndex, final long currentReplicatedIndex) {
 
+    /**
+     * Performs a snapshot with no capture on the replicated log.
+     * It clears the log from the supplied index or last-applied-1 which ever is minimum.
+     *
+     * @param snapshotCapturedIndex
+     */
+    protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
         //  we would want to keep the lastApplied as its used while capturing snapshots
-        long tempMin = Math.min(minReplicatedToAllIndex,
-                (context.getLastApplied() > -1 ? context.getLastApplied() - 1 : -1));
+        long lastApplied = context.getLastApplied();
+        long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
 
         if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
-            context.getReplicatedLog().snapshotPreCommit(tempMin, context.getTermInformation().getCurrentTerm());
+            LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
+                    context.getTermInformation().getCurrentTerm());
+
+            //use the term of the temp-min, since we check for isPresent, entry will not be null
+            ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
+            context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
             context.getReplicatedLog().snapshotCommit();
-            return tempMin;
+            setReplicatedToAllIndex(tempMin);
         }
-        return currentReplicatedIndex;
     }
+
 }
index 09ffe05..b36c41a 100644 (file)
@@ -47,12 +47,12 @@ public class Candidate extends AbstractRaftActorBehavior {
     private final Set<String> peers;
 
     public Candidate(RaftActorContext context) {
-        super(context);
+        super(context, RaftState.Candidate);
 
         peers = context.getPeerAddresses().keySet();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Election: Candidate has following peers: {}", context.getId(), peers);
+            LOG.debug("{}: Election: Candidate has following peers: {}", logName(), peers);
         }
 
         votesRequired = getMajorityVoteCount(peers.size());
@@ -65,7 +65,7 @@ public class Candidate extends AbstractRaftActorBehavior {
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
+            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
         }
 
         return this;
@@ -78,7 +78,10 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
-        RequestVoteReply requestVoteReply) {
+            RequestVoteReply requestVoteReply) {
+
+        LOG.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply,
+                voteCount);
 
         if (requestVoteReply.isVoteGranted()) {
             voteCount++;
@@ -91,10 +94,6 @@ public class Candidate extends AbstractRaftActorBehavior {
         return this;
     }
 
-    @Override public RaftState state() {
-        return RaftState.Candidate;
-    }
-
     @Override
     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
@@ -105,7 +104,7 @@ public class Candidate extends AbstractRaftActorBehavior {
             RaftRPC rpc = (RaftRPC) message;
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: RaftRPC message received {} my term is {}", context.getId(), rpc,
+                LOG.debug("{}: RaftRPC message received {}, my term is {}", logName(), rpc,
                         context.getTermInformation().getCurrentTerm());
             }
 
@@ -120,6 +119,8 @@ public class Candidate extends AbstractRaftActorBehavior {
         }
 
         if (message instanceof ElectionTimeout) {
+            LOG.debug("{}: Received ElectionTimeout", logName());
+
             if (votesRequired == 0) {
                 // If there are no peers then we should be a Leader
                 // We wait for the election timeout to occur before declare
@@ -146,12 +147,10 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         // Increment the election term and vote for self
         long currentTerm = context.getTermInformation().getCurrentTerm();
-        context.getTermInformation().updateAndPersist(currentTerm + 1,
-            context.getId());
+        long newTerm = currentTerm + 1;
+        context.getTermInformation().updateAndPersist(newTerm, context.getId());
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Starting new term {}", context.getId(), (currentTerm + 1));
-        }
+        LOG.debug("{}: Starting new term {}", logName(), newTerm);
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
@@ -159,17 +158,17 @@ public class Candidate extends AbstractRaftActorBehavior {
         for (String peerId : peers) {
             ActorSelection peerActor = context.getPeerActorSelection(peerId);
             if(peerActor != null) {
-                peerActor.tell(new RequestVote(
+                RequestVote requestVote = new RequestVote(
                         context.getTermInformation().getCurrentTerm(),
                         context.getId(),
                         context.getReplicatedLog().lastIndex(),
-                        context.getReplicatedLog().lastTerm()),
-                    context.getActor()
-                );
-            }
-        }
+                        context.getReplicatedLog().lastTerm());
 
+                LOG.debug("{}: Sending {} to peer {}", logName(), requestVote, peerId);
 
+                peerActor.tell(requestVote, context.getActor());
+            }
+        }
     }
 
     @Override public void close() throws Exception {
index 8a07887..c799441 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -40,7 +39,7 @@ public class Follower extends AbstractRaftActorBehavior {
     private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
-        super(context);
+        super(context, RaftState.Follower);
 
         scheduleElection(electionDuration());
     }
@@ -75,10 +74,11 @@ public class Follower extends AbstractRaftActorBehavior {
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
                                                               AppendEntries appendEntries) {
 
-        if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries);
-            }
+        int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntries: {}", logName(), appendEntries);
+        } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
+            LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
         }
 
         // TODO : Refactor this method into a bunch of smaller methods
@@ -101,44 +101,31 @@ public class Follower extends AbstractRaftActorBehavior {
         boolean outOfSync = true;
 
         // First check if the logs are in sync or not
-        if (lastIndex() == -1
-                && appendEntries.getPrevLogIndex() != -1) {
+        long lastIndex = lastIndex();
+        if (lastIndex == -1 && appendEntries.getPrevLogIndex() != -1) {
 
             // The follower's log is out of sync because the leader does have
             // an entry at prevLogIndex and this follower has no entries in
             // it's log.
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
-                        context.getId(), appendEntries.getPrevLogIndex());
-            }
-
-        } else if (lastIndex() > -1
-                && appendEntries.getPrevLogIndex() != -1
-                && !prevEntryPresent) {
+            LOG.debug("{}: The followers log is empty and the senders prevLogIndex is {}",
+                        logName(), appendEntries.getPrevLogIndex());
+        } else if (lastIndex > -1 && appendEntries.getPrevLogIndex() != -1 && !prevEntryPresent) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry was not found in it's log
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
-                        context.getId(), appendEntries.getPrevLogIndex());
-            }
-
-        } else if (lastIndex() > -1
-                && prevEntryPresent
-                && prevLogTerm != appendEntries.getPrevLogTerm()) {
+            LOG.debug("{}: The log is not empty but the prevLogIndex {} was not found in it",
+                        logName(), appendEntries.getPrevLogIndex());
+        } else if (lastIndex > -1 && prevEntryPresent && prevLogTerm != appendEntries.getPrevLogTerm()) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                        , context.getId(), prevLogTerm
-                        , appendEntries.getPrevLogTerm());
-            }
+            LOG.debug(
+                "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}",
+                 logName(), prevLogTerm, appendEntries.getPrevLogTerm());
         } else {
             outOfSync = false;
         }
@@ -146,25 +133,19 @@ public class Follower extends AbstractRaftActorBehavior {
         if (outOfSync) {
             // We found that the log was out of sync so just send a negative
             // reply and return
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Follower ({}) is out-of-sync, " +
-                        "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                        context.getId(), context.getId(), lastIndex(), lastTerm()
-                );
-            }
-            sender.tell(
-                new AppendEntriesReply(context.getId(), currentTerm(), false,
-                    lastIndex(), lastTerm()), actor()
-            );
+
+            LOG.debug("{}: Follower is out-of-sync, so sending negative reply, lastIndex: {}, lastTerm: {}",
+                        logName(), lastIndex, lastTerm());
+
+            sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                    lastTerm()), actor());
             return this;
         }
 
-        if (appendEntries.getEntries() != null
-            && appendEntries.getEntries().size() > 0) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Number of entries to be appended = {}", context.getId(),
+        if (appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
+
+            LOG.debug("{}: Number of entries to be appended = {}", logName(),
                         appendEntries.getEntries().size());
-            }
 
             // 3. If an existing entry conflicts with a new one (same index
             // but different terms), delete the existing entry and all that
@@ -182,80 +163,75 @@ public class Follower extends AbstractRaftActorBehavior {
                         break;
                     }
 
-                    if (newEntry.getTerm() == matchEntry
-                        .getTerm()) {
+                    if (newEntry.getTerm() == matchEntry.getTerm()) {
                         continue;
                     }
 
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Removing entries from log starting at {}", context.getId(),
+                    LOG.debug("{}: Removing entries from log starting at {}", logName(),
                                 matchEntry.getIndex());
-                    }
 
                     // Entries do not match so remove all subsequent entries
-                    context.getReplicatedLog()
-                        .removeFromAndPersist(matchEntry.getIndex());
+                    context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
                     break;
                 }
             }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: After cleanup entries to be added from = {}", context.getId(),
-                        (addEntriesFrom + lastIndex()));
-            }
+            lastIndex = lastIndex();
+            LOG.debug("{}: After cleanup entries to be added from = {}", logName(),
+                        (addEntriesFrom + lastIndex));
 
             // 4. Append any new entries not already in the log
-            for (int i = addEntriesFrom;
-                 i < appendEntries.getEntries().size(); i++) {
+            for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
+                ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
 
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: Append entry to log {}", context.getId(),
-                            appendEntries.getEntries().get(i).getData());
-                }
-                context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
-            }
+                LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Log size is now {}", context.getId(), context.getReplicatedLog().size());
+                context.getReplicatedLog().appendAndPersist(entry);
             }
-        }
 
+            LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
+        }
 
         // 5. If leaderCommit > commitIndex, set commitIndex =
         // min(leaderCommit, index of last new entry)
 
+        lastIndex = lastIndex();
         long prevCommitIndex = context.getCommitIndex();
 
-        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
-            context.getReplicatedLog().lastIndex()));
+        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
 
         if (prevCommitIndex != context.getCommitIndex()) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Commit index set to {}", context.getId(), context.getCommitIndex());
-            }
+            LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
         }
 
         // If commitIndex > lastApplied: increment lastApplied, apply
         // log[lastApplied] to state machine (§5.3)
         // check if there are any entries to be applied. last-applied can be equal to last-index
         if (appendEntries.getLeaderCommit() > context.getLastApplied() &&
-            context.getLastApplied() < lastIndex()) {
+            context.getLastApplied() < lastIndex) {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: applyLogToStateMachine, " +
-                        "appendEntries.getLeaderCommit():{}," +
-                        "context.getLastApplied():{}, lastIndex():{}", context.getId(),
-                    appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex()
-                );
+                        "appendEntries.getLeaderCommit(): {}," +
+                        "context.getLastApplied(): {}, lastIndex(): {}", logName(),
+                    appendEntries.getLeaderCommit(), context.getLastApplied(), lastIndex);
             }
 
             applyLogToStateMachine(appendEntries.getLeaderCommit());
         }
 
-        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
-            lastIndex(), lastTerm()), actor());
+        AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
+            lastIndex, lastTerm());
+
+        if(LOG.isTraceEnabled()) {
+            LOG.trace("{}: handleAppendEntries returning : {}", logName(), reply);
+        } else if(LOG.isDebugEnabled() && numLogEntries > 0) {
+            LOG.debug("{}: handleAppendEntries returning : {}", logName(), reply);
+        }
+
+        sender.tell(reply, actor());
 
         if (!context.isSnapshotCaptureInitiated()) {
-            fakeSnapshot(appendEntries.getReplicatedToAllIndex(), appendEntries.getReplicatedToAllIndex());
+            super.performSnapshotWithoutCapture(appendEntries.getReplicatedToAllIndex());
         }
 
         return this;
@@ -271,10 +247,6 @@ public class Follower extends AbstractRaftActorBehavior {
         return this;
     }
 
-    @Override public RaftState state() {
-        return RaftState.Follower;
-    }
-
     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
@@ -285,11 +257,15 @@ public class Follower extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
             }
         }
 
         if (message instanceof ElectionTimeout) {
+            LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
             return switchBehavior(new Candidate(context));
 
         } else if (message instanceof InstallSnapshot) {
@@ -304,12 +280,10 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: InstallSnapshot received by follower " +
-                    "datasize:{} , Chunk:{}/{}", context.getId(), installSnapshot.getData().size(),
-                installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()
-            );
-        }
+
+        LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
+                    logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
+                    installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
 
         if(snapshotTracker == null){
             snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
@@ -331,18 +305,23 @@ public class Follower extends AbstractRaftActorBehavior {
 
             }
 
-            sender.tell(new InstallSnapshotReply(
-                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(),
-                    true), actor());
+            InstallSnapshotReply reply = new InstallSnapshotReply(
+                    currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
+            LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+
+            sender.tell(reply, actor());
 
         } catch (SnapshotTracker.InvalidChunkException e) {
+            LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     -1, false), actor());
             snapshotTracker = null;
 
         } catch (Exception e){
-            LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e);
+            LOG.error("{}: Exception in InstallSnapshot of follower", logName(), e);
+
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     installSnapshot.getChunkIndex(), false), actor());
@@ -350,14 +329,13 @@ public class Follower extends AbstractRaftActorBehavior {
         }
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         stopElection();
     }
 
     @VisibleForTesting
-    ByteString getSnapshotChunksCollected(){
-        return snapshotTracker != null ? snapshotTracker.getCollectedChunks() : ByteString.EMPTY;
+    SnapshotTracker getSnapshotTracker(){
+        return snapshotTracker;
     }
-
-
 }
index 7a94c0c..ebcdcd4 100644 (file)
@@ -8,12 +8,12 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
@@ -38,15 +38,12 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public class Leader extends AbstractLeader {
-    private Cancellable installSnapshotSchedule = null;
-    private Cancellable isolatedLeaderCheckSchedule = null;
+    private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
+    private final Stopwatch isolatedLeaderCheck;
 
     public Leader(RaftActorContext context) {
         super(context);
-
-        scheduleIsolatedLeaderCheck(
-            new FiniteDuration(context.getConfigParams().getHeartBeatInterval().length() * 10,
-                context.getConfigParams().getHeartBeatInterval().unit()));
+        isolatedLeaderCheck = Stopwatch.createStarted();
     }
 
     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
@@ -54,8 +51,9 @@ public class Leader extends AbstractLeader {
 
         if (originalMessage instanceof IsolatedLeaderCheck) {
             if (isLeaderIsolated()) {
-                LOG.info("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
                         context.getId(), minIsolatedLeaderPeerCount, leaderId);
+
                 return switchBehavior(new IsolatedLeader(context));
             }
         }
@@ -63,21 +61,17 @@ public class Leader extends AbstractLeader {
         return super.handleMessage(sender, originalMessage);
     }
 
-    protected void stopIsolatedLeaderCheckSchedule() {
-        if (isolatedLeaderCheckSchedule != null && !isolatedLeaderCheckSchedule.isCancelled()) {
-            isolatedLeaderCheckSchedule.cancel();
+    @Override
+    protected void beforeSendHeartbeat(){
+        if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
+            context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
+            isolatedLeaderCheck.reset().start();
         }
-    }
 
-    protected void scheduleIsolatedLeaderCheck(FiniteDuration isolatedCheckInterval) {
-        isolatedLeaderCheckSchedule = context.getActorSystem().scheduler().schedule(isolatedCheckInterval, isolatedCheckInterval,
-            context.getActor(), new IsolatedLeaderCheck(),
-            context.getActorSystem().dispatcher(), context.getActor());
     }
 
     @Override
     public void close() throws Exception {
-        stopIsolatedLeaderCheckSchedule();
         super.close();
     }
 
index 064cd8b..b766e0c 100644 (file)
@@ -50,4 +50,16 @@ public interface RaftActorBehavior extends AutoCloseable{
      * @return
      */
     String getLeaderId();
+
+    /**
+     * setting the index of the log entry which is replicated to all nodes
+     * @param replicatedToAllIndex
+     */
+    void setReplicatedToAllIndex(long replicatedToAllIndex);
+
+    /**
+     * getting the index of the log entry which is replicated to all nodes
+     * @return
+     */
+    long getReplicatedToAllIndex();
 }
index 97bcd6a..d2ea0c5 100644 (file)
@@ -110,19 +110,15 @@ public class AppendEntries extends AbstractRaftRPC {
         return replicatedToAllIndex;
     }
 
+
     @Override
     public String toString() {
-        final StringBuilder sb =
-            new StringBuilder("AppendEntries{");
-        sb.append("term=").append(getTerm());
-        sb.append("leaderId='").append(leaderId).append('\'');
-        sb.append(", prevLogIndex=").append(prevLogIndex);
-        sb.append(", prevLogTerm=").append(prevLogTerm);
-        sb.append(", entries=").append(entries);
-        sb.append(", leaderCommit=").append(leaderCommit);
-        sb.append(", replicatedToAllIndex=").append(replicatedToAllIndex);
-        sb.append('}');
-        return sb.toString();
+        StringBuilder builder = new StringBuilder();
+        builder.append("AppendEntries [term=").append(term).append(", leaderId=").append(leaderId)
+                .append(", prevLogIndex=").append(prevLogIndex).append(", prevLogTerm=").append(prevLogTerm)
+                .append(", entries=").append(entries).append(", leaderCommit=").append(leaderCommit)
+                .append(", replicatedToAllIndex=").append(replicatedToAllIndex).append("]");
+        return builder.toString();
     }
 
     public <T extends Object> Object toSerializable() {
index 32ed85b..01fef00 100644 (file)
@@ -59,15 +59,12 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         return followerId;
     }
 
-    @Override public String toString() {
-        final StringBuilder sb =
-            new StringBuilder("AppendEntriesReply{");
-        sb.append("term=").append(term);
-        sb.append(", success=").append(success);
-        sb.append(", logLastIndex=").append(logLastIndex);
-        sb.append(", logLastTerm=").append(logLastTerm);
-        sb.append(", followerId='").append(followerId).append('\'');
-        sb.append('}');
-        return sb.toString();
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("AppendEntriesReply [term=").append(term).append(", success=").append(success)
+                .append(", logLastIndex=").append(logLastIndex).append(", logLastTerm=").append(logLastTerm)
+                .append(", followerId=").append(followerId).append("]");
+        return builder.toString();
     }
 }
index 6337f8f..13636f3 100644 (file)
@@ -73,6 +73,7 @@ public class InstallSnapshot extends AbstractRaftRPC {
 
     public <T extends Object> Object toSerializable(){
         InstallSnapshotMessages.InstallSnapshot.Builder builder = InstallSnapshotMessages.InstallSnapshot.newBuilder()
+                .setTerm(this.getTerm())
                 .setLeaderId(this.getLeaderId())
                 .setChunkIndex(this.getChunkIndex())
                 .setData(this.getData())
@@ -102,4 +103,15 @@ public class InstallSnapshot extends AbstractRaftRPC {
 
         return installSnapshot;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("InstallSnapshot [term=").append(term).append(", leaderId=").append(leaderId)
+                .append(", lastIncludedIndex=").append(lastIncludedIndex).append(", lastIncludedTerm=")
+                .append(lastIncludedTerm).append(", data=").append(data).append(", chunkIndex=").append(chunkIndex)
+                .append(", totalChunks=").append(totalChunks).append(", lastChunkHashCode=").append(lastChunkHashCode)
+                .append("]");
+        return builder.toString();
+    }
 }
index 15621bf..77efa53 100644 (file)
@@ -36,4 +36,12 @@ public class InstallSnapshotReply extends AbstractRaftRPC {
     public boolean isSuccess() {
         return success;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("InstallSnapshotReply [term=").append(term).append(", followerId=").append(followerId)
+                .append(", chunkIndex=").append(chunkIndex).append(", success=").append(success).append("]");
+        return builder.toString();
+    }
 }
index 9ba5acb..8f162ae 100644 (file)
@@ -64,14 +64,12 @@ public class RequestVote extends AbstractRaftRPC {
         this.lastLogTerm = lastLogTerm;
     }
 
-    @Override public String toString() {
-        final StringBuilder sb =
-            new StringBuilder("RequestVote{");
-        sb.append("term='").append(getTerm()).append('\'');
-        sb.append("candidateId='").append(candidateId).append('\'');
-        sb.append(", lastLogIndex=").append(lastLogIndex);
-        sb.append(", lastLogTerm=").append(lastLogTerm);
-        sb.append('}');
-        return sb.toString();
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("RequestVote [term=").append(term).append(", candidateId=").append(candidateId)
+                .append(", lastLogIndex=").append(lastLogIndex).append(", lastLogTerm=").append(lastLogTerm)
+                .append("]");
+        return builder.toString();
     }
 }
index b3c95d6..865d4c2 100644 (file)
@@ -27,4 +27,11 @@ public class RequestVoteReply extends AbstractRaftRPC {
     public boolean isVoteGranted() {
         return voteGranted;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("RequestVoteReply [term=").append(term).append(", voteGranted=").append(voteGranted).append("]");
+        return builder.toString();
+    }
 }
index ffd8edf..885c3ab 100644 (file)
@@ -20,6 +20,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockReplicatedLogEntry;
+
 /**
 *
 */
@@ -130,11 +131,17 @@ public class AbstractReplicatedLogImplTest {
 
     @Test
     public void testSnapshotPreCommit() {
+        //add 4 more entries
         replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(3, 6, new MockPayload("G")));
         replicatedLogImpl.append(new MockReplicatedLogEntry(3, 7, new MockPayload("H")));
 
+        //sending negative values should not cause any changes
+        replicatedLogImpl.snapshotPreCommit(-1, -1);
+        assertEquals(8, replicatedLogImpl.size());
+        assertEquals(-1, replicatedLogImpl.getSnapshotIndex());
+
         replicatedLogImpl.snapshotPreCommit(4, 3);
         assertEquals(3, replicatedLogImpl.size());
         assertEquals(4, replicatedLogImpl.getSnapshotIndex());
@@ -152,7 +159,31 @@ public class AbstractReplicatedLogImplTest {
         assertEquals(0, replicatedLogImpl.size());
         assertEquals(7, replicatedLogImpl.getSnapshotIndex());
 
+    }
+
+    @Test
+    public void testIsPresent() {
+        assertTrue(replicatedLogImpl.isPresent(0));
+        assertTrue(replicatedLogImpl.isPresent(1));
+        assertTrue(replicatedLogImpl.isPresent(2));
+        assertTrue(replicatedLogImpl.isPresent(3));
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("D")));
+        replicatedLogImpl.snapshotPreCommit(3, 2); //snapshot on 3
+        replicatedLogImpl.snapshotCommit();
+
+        assertFalse(replicatedLogImpl.isPresent(0));
+        assertFalse(replicatedLogImpl.isPresent(1));
+        assertFalse(replicatedLogImpl.isPresent(2));
+        assertFalse(replicatedLogImpl.isPresent(3));
+        assertTrue(replicatedLogImpl.isPresent(4));
+
+        replicatedLogImpl.snapshotPreCommit(4, 2); //snapshot on 4
+        replicatedLogImpl.snapshotCommit();
+        assertFalse(replicatedLogImpl.isPresent(4));
 
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("D")));
+        assertTrue(replicatedLogImpl.isPresent(5));
     }
 
     // create a snapshot for test
index 4d33152..24581d6 100644 (file)
@@ -53,7 +53,7 @@ public class MockRaftActorContext implements RaftActorContext {
              * Identifier of the actor whose election term information this is
              */
             private final String id = id1;
-            private long currentTerm = 0;
+            private long currentTerm = 1;
             private String votedFor = "";
 
             @Override
@@ -88,8 +88,9 @@ public class MockRaftActorContext implements RaftActorContext {
 
     public void initReplicatedLog(){
         this.replicatedLog = new SimpleReplicatedLog();
-        this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1")));
-        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2")));
+        long term = getTermInformation().getCurrentTerm();
+        this.replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
+        this.replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
     }
 
     @Override public ActorRef actorOf(Props props) {
@@ -255,6 +256,36 @@ public class MockRaftActorContext implements RaftActorContext {
         public String toString() {
             return value;
         }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((value == null) ? 0 : value.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            MockPayload other = (MockPayload) obj;
+            if (value == null) {
+                if (other.value != null) {
+                    return false;
+                }
+            } else if (!value.equals(other.value)) {
+                return false;
+            }
+            return true;
+        }
     }
 
     public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
@@ -287,6 +318,52 @@ public class MockRaftActorContext implements RaftActorContext {
         public int size() {
             return getData().size();
         }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((data == null) ? 0 : data.hashCode());
+            result = prime * result + (int) (index ^ (index >>> 32));
+            result = prime * result + (int) (term ^ (term >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
+            if (data == null) {
+                if (other.data != null) {
+                    return false;
+                }
+            } else if (!data.equals(other.data)) {
+                return false;
+            }
+            if (index != other.index) {
+                return false;
+            }
+            if (term != other.term) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
+                    .append(", data=").append(data).append("]");
+            return builder.toString();
+        }
     }
 
     public static class MockReplicatedLogBuilder {
index 59046fd..83868b6 100644 (file)
@@ -552,7 +552,6 @@ public class RaftActorTest extends AbstractActorTest {
                 assertNotEquals("voted for", "foobar", mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
 
                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
-
             }};
     }
 
@@ -576,12 +575,12 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
 
                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
-
             }
-
         };
     }
 
@@ -602,14 +601,14 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
 
                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
 
                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
-
             }
-
         };
     }
 
@@ -630,14 +629,14 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
 
                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
 
                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
-
             }
-
         };
     }
 
@@ -658,6 +657,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
 
                 verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
@@ -685,13 +686,15 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 ByteString snapshotBytes = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
                         new MockRaftActorContext.MockPayload("B"),
                         new MockRaftActorContext.MockPayload("C"),
                         new MockRaftActorContext.MockPayload("D")));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1,-1, 1, -1, 1));
 
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
 
@@ -722,6 +725,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, mock(Payload.class)));
                 mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1, 2, mock(Payload.class)));
@@ -737,7 +742,8 @@ public class RaftActorTest extends AbstractActorTest {
                 RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext();
                 mockRaftActor.setCurrentBehavior(new Follower(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
+                long replicatedToAllIndex = 1;
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1, replicatedToAllIndex, 1));
 
                 verify(mockRaftActor.delegate).createSnapshot();
 
@@ -749,13 +755,16 @@ public class RaftActorTest extends AbstractActorTest {
 
                 verify(dataPersistenceProvider).deleteMessages(100);
 
-                assertEquals(2, mockRaftActor.getReplicatedLog().size());
+                assertEquals(3, mockRaftActor.getReplicatedLog().size());
+                assertEquals(1, mockRaftActor.getCurrentBehavior().getReplicatedToAllIndex());
 
+                assertNotNull(mockRaftActor.getReplicatedLog().get(2));
                 assertNotNull(mockRaftActor.getReplicatedLog().get(3));
                 assertNotNull(mockRaftActor.getReplicatedLog().get(4));
 
                 // Index 2 will not be in the log because it was removed due to snapshotting
-                assertNull(mockRaftActor.getReplicatedLog().get(2));
+                assertNull(mockRaftActor.getReplicatedLog().get(1));
+                assertNull(mockRaftActor.getReplicatedLog().get(0));
 
             }
         };
@@ -779,6 +788,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                         new MockRaftActorContext.MockPayload("F"));
 
@@ -807,6 +818,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
 
                 oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
@@ -860,6 +873,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
                 ByteString snapshotBytes = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
                         new MockRaftActorContext.MockPayload("B"),
@@ -870,7 +885,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.setCurrentBehavior(new Leader(raftActorContext));
 
-                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1));
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, -1, 1, -1, 1));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -888,33 +903,44 @@ public class RaftActorTest extends AbstractActorTest {
     public void testRaftRoleChangeNotifier() throws Exception {
         new JavaTestKit(getSystem()) {{
             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+            MessageCollectorActor.waitUntilReady(notifierActor);
+
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            long heartBeatInterval = 100;
+            config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
+            config.setElectionTimeoutFactor(1);
+
             String persistenceId = factory.generateActorId("notifier-");
 
             factory.createTestActor(MockRaftActor.props(persistenceId,
                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
 
-            // sleeping for a minimum of 2 seconds, if it spans more its fine.
-            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+            List<RoleChanged> matches =  null;
+            for(int i = 0; i < 5000 / heartBeatInterval; i++) {
+                matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+                assertNotNull(matches);
+                if(matches.size() == 3) {
+                    break;
+                }
+                Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
+            }
 
-            List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
-            assertNotNull(matches);
             assertEquals(3, matches.size());
 
             // check if the notifier got a role change from null to Follower
-            RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+            RoleChanged raftRoleChanged = matches.get(0);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertNull(raftRoleChanged.getOldRole());
             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
 
             // check if the notifier got a role change from Follower to Candidate
-            raftRoleChanged = (RoleChanged) matches.get(1);
+            raftRoleChanged = matches.get(1);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
 
             // check if the notifier got a role change from Candidate to Leader
-            raftRoleChanged = (RoleChanged) matches.get(2);
+            raftRoleChanged = matches.get(2);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
@@ -940,11 +966,12 @@ public class RaftActorTest extends AbstractActorTest {
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(follower1Id, followerActor1.path().toString());
 
-                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
                         MockRaftActor.props(persistenceId, peerAddresses,
                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
                 leaderActor.getRaftActorContext().setCommitIndex(4);
                 leaderActor.getRaftActorContext().setLastApplied(4);
                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
@@ -962,7 +989,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
-                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1));
+                leaderActor.onReceiveCommand(new CaptureSnapshot(6, 1, 4, 1, 4, 1));
+
                 leaderActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(leaderActor.delegate).createSnapshot();
 
@@ -1029,7 +1057,7 @@ public class RaftActorTest extends AbstractActorTest {
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(leaderId, leaderActor1.path().toString());
 
-                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(),
+                TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
                         MockRaftActor.props(persistenceId, peerAddresses,
                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
@@ -1040,19 +1068,21 @@ public class RaftActorTest extends AbstractActorTest {
 
                 followerActor.waitForInitializeBehaviorComplete();
 
-                // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
+
                 Follower follower = new Follower(followerActor.getRaftActorContext());
                 followerActor.setCurrentBehavior(follower);
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
 
+                // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
 
-                // log as indices 0-5
+                // log has indices 0-5
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
                 //snapshot on 4
-                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1));
+                followerActor.onReceiveCommand(new CaptureSnapshot(5, 1, 4, 1, 4, 1));
+
                 followerActor.getRaftActorContext().setSnapshotCaptureInitiated(true);
                 verify(followerActor.delegate).createSnapshot();
 
@@ -1090,7 +1120,7 @@ public class RaftActorTest extends AbstractActorTest {
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                 assertFalse(followerActor.getRaftActorContext().isSnapshotCaptureInitiated());
 
-                // capture snapshot reply should remove the snapshotted entries only
+                // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
 
@@ -1150,16 +1180,22 @@ public class RaftActorTest extends AbstractActorTest {
                 // create 5 entries in the log
                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
+
                 //set the snapshot index to 4 , 0 to 4 are snapshotted
                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
+                //setting replicatedToAllIndex = 9, for the log to clear
+                leader.setReplicatedToAllIndex(9);
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
+                assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // simulate a real snapshot
                 leaderActor.onReceiveCommand(new SendHeartBeat());
@@ -1182,7 +1218,7 @@ public class RaftActorTest extends AbstractActorTest {
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
                 assertFalse(leaderActor.getRaftActorContext().isSnapshotCaptureInitiated());
 
-                assertEquals("Real snapshot didn't clear the log till lastApplied", 0, leaderActor.getReplicatedLog().size());
+                assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
index 6872c8f..b47df13 100644 (file)
@@ -21,6 +21,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import java.util.LinkedList;
 import java.util.List;
@@ -108,10 +109,14 @@ public class TestActorFactory implements AutoCloseable {
     }
 
     @Override
-    public void close() throws Exception {
-        for(ActorRef actor : createdActors){
-            LOG.info("Killing actor {}", actor);
-            actor.tell(PoisonPill.getInstance(), null);
-        }
+    public void close() {
+        new JavaTestKit(system) {{
+            for(ActorRef actor : createdActors) {
+                watch(actor);
+                LOG.info("Killing actor {}", actor);
+                actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                expectTerminated(duration("5 seconds"), actor);
+            }
+        }};
     }
 }
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderTest.java
new file mode 100644 (file)
index 0000000..dd3ed23
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
+import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+public abstract class AbstractLeaderTest extends AbstractRaftActorBehaviorTest{
+
+    /**
+     * When we removed scheduling of heartbeat in the AbstractLeader constructor we ended up with a situation where
+     * if no follower responded to an initial AppendEntries heartbeats would not be sent to it. This test verifies
+     * that regardless of whether followers respond or not we schedule heartbeats.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries() throws Exception {
+        logStart("testLeaderSchedulesHeartbeatsEvenWhenNoFollowersRespondToInitialAppendEntries");
+        new JavaTestKit(getSystem()) {{
+            String leaderActorId = actorFactory.generateActorId("leader");
+            String follower1ActorId = actorFactory.generateActorId("follower");
+            String follower2ActorId = actorFactory.generateActorId("follower");
+
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+                    actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+            ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+            ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+            MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(follower1ActorId,
+                    follower1Actor.path().toString());
+            peerAddresses.put(follower2ActorId,
+                    follower2Actor.path().toString());
+
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+            leaderActor.underlyingActor().setBehavior(leader);
+
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+            List<SendHeartBeat> allMessages = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+            // Need more than 1 heartbeat to be delivered because we waited for 1 second with heartbeat interval 200ms
+            assertTrue(String.format("%s messages is less than expected", allMessages.size()),
+                    allMessages.size() > 1);
+
+        }};
+    }
+
+}
index 42a7911..f56755b 100644 (file)
@@ -1,32 +1,52 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+    protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
-public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
+    private final TestActorRef<MessageCollectorActor> behaviorActor = actorFactory.createTestActor(
+            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("behavior"));
+
+    RaftActorBehavior behavior;
 
-    private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
+    @After
+    public void tearDown() throws Exception {
+        if(behavior != null) {
+            behavior.close();
+        }
+
+        actorFactory.close();
+    }
 
     /**
      * This test checks that when a new Raft RPC message is received with a newer
@@ -36,22 +56,19 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      */
     @Test
     public void testHandleRaftRPCWithNewerTerm() throws Exception {
-        new JavaTestKit(getSystem()) {{
+        RaftActorContext actorContext = createActorContext();
 
-            assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+        assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
                 createAppendEntriesWithNewerTerm());
 
-            assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+        assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
                 createAppendEntriesReplyWithNewerTerm());
 
-            assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+        assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
                 createRequestVoteWithNewerTerm());
 
-            assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(getTestActor(),
+        assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
                 createRequestVoteReplyWithNewerTerm());
-
-
-        }};
     }
 
 
@@ -63,144 +80,95 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      * @throws Exception
      */
     @Test
-    public void testHandleAppendEntriesSenderTermLessThanReceiverTerm()
-        throws Exception {
-        new JavaTestKit(getSystem()) {{
-
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
+    public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
+            MockRaftActorContext context = createActorContext();
 
             // First set the receivers term to a high number (1000)
             context.getTermInformation().update(1000, "test");
 
-            AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
+            AppendEntries appendEntries = new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
 
-            RaftActorBehavior behavior = createBehavior(context);
+            behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
 
-            RaftActorBehavior raftBehavior =
-                behavior.handleMessage(getRef(), appendEntries);
+            RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
 
-            assertEquals(expected, raftBehavior);
+            assertEquals("Raft state", expected.state(), raftBehavior.state());
 
             // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
-
-            assertEquals(false, out);
-
-
-        }};
-    }
 
+            AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
+                    behaviorActor, AppendEntriesReply.class);
 
-    @Test
-    public void testHandleAppendEntriesAddSameEntryToLog(){
-        new JavaTestKit(getSystem()) {
-            {
+            assertEquals("isSuccess", false, reply.isSuccess());
+    }
 
-                MockRaftActorContext context = (MockRaftActorContext)
-                    createActorContext();
 
-                // First set the receivers term to lower number
-                context.getTermInformation().update(2, "test");
+    @Test
+    public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
+        MockRaftActorContext context = createActorContext();
 
-                // Prepare the receivers log
-                MockRaftActorContext.SimpleReplicatedLog log =
-                    new MockRaftActorContext.SimpleReplicatedLog();
-                log.append(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+        context.getTermInformation().update(2, "test");
 
-                context.setReplicatedLog(log);
+        // Prepare the receivers log
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("zero");
+        setLastLogEntry(context, 2, 0, payload);
 
-                List<ReplicatedLogEntry> entries = new ArrayList<>();
-                entries.add(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(new MockRaftActorContext.MockReplicatedLogEntry(2, 0, payload));
 
-                AppendEntries appendEntries =
-                    new AppendEntries(2, "leader-1", -1, 1, entries, 0, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 2, -1);
 
-                RaftActorBehavior behavior = createBehavior(context);
+        behavior = createBehavior(context);
 
-                if (AbstractRaftActorBehaviorTest.this instanceof CandidateTest) {
-                    // Resetting the Candidates term to make sure it will match
-                    // the term sent by AppendEntries. If this was not done then
-                    // the test will fail because the Candidate will assume that
-                    // the message was sent to it from a lower term peer and will
-                    // thus respond with a failure
-                    context.getTermInformation().update(2, "test");
-                }
+        if (behavior instanceof Candidate) {
+            // Resetting the Candidates term to make sure it will match
+            // the term sent by AppendEntries. If this was not done then
+            // the test will fail because the Candidate will assume that
+            // the message was sent to it from a lower term peer and will
+            // thus respond with a failure
+            context.getTermInformation().update(2, "test");
+        }
 
-                // Send an unknown message so that the state of the RaftActor remains unchanged
-                RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+        // Send an unknown message so that the state of the RaftActor remains unchanged
+        RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
 
-                RaftActorBehavior raftBehavior =
-                    behavior.handleMessage(getRef(), appendEntries);
+        RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
 
-                assertEquals(expected, raftBehavior);
+        assertEquals("Raft state", expected.state(), raftBehavior.state());
 
-                assertEquals(1, log.size());
+        assertEquals("ReplicatedLog size", 1, context.getReplicatedLog().size());
 
+        handleAppendEntriesAddSameEntryToLogReply(behaviorActor);
+    }
 
-            }};
+    protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+            throws Exception {
+        AppendEntriesReply reply = MessageCollectorActor.getFirstMatching(replyActor, AppendEntriesReply.class);
+        Assert.assertNull("Expected no AppendEntriesReply", reply);
     }
 
     /**
      * This test verifies that when a RequestVote is received by the RaftActor
-     * with a term which is greater than the RaftActors' currentTerm and the
-     * senders' log is more upto date than the receiver that the receiver grants
-     * the vote to the sender
+     * with the senders' log is more up to date than the receiver that the receiver grants
+     * the vote to the sender.
      */
     @Test
-    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermAndSenderLogMoreUpToDate() {
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    RaftActorBehavior behavior = createBehavior(
-                        createActorContext(behaviorActor));
-
-                    RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
-                        new RequestVote(1000, "test", 10000, 999));
-
-                    if(!(behavior instanceof Follower)){
-                        assertTrue(raftBehavior instanceof Follower);
-                    } else {
-
-                        final Boolean out =
-                            new ExpectMsg<Boolean>(duration("1 seconds"),
-                                "RequestVoteReply") {
-                                // do not put code outside this method, will run afterwards
-                                protected Boolean match(Object in) {
-                                    if (in instanceof RequestVoteReply) {
-                                        RequestVoteReply reply =
-                                            (RequestVoteReply) in;
-                                        return reply.isVoteGranted();
-                                    } else {
-                                        throw noMatch();
-                                    }
-                                }
-                            }.get();
-
-                        assertEquals(true, out);
-                    }
-                }
-            };
-        }};
+    public void testHandleRequestVoteWhenSenderLogMoreUpToDate() {
+        MockRaftActorContext context = createActorContext();
+
+        behavior = createBehavior(context);
+
+        context.getTermInformation().update(1, "test");
+
+        behavior.handleMessage(behaviorActor, new RequestVote(context.getTermInformation().getCurrentTerm(),
+                "test", 10000, 999));
+
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
+                RequestVoteReply.class);
+        assertEquals("isVoteGranted", true, reply.isVoteGranted());
     }
 
     /**
@@ -209,51 +177,24 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      * log then the receiving RaftActor will not grant the vote to the sender
      */
     @Test
-    public void testHandleRequestVoteWhenSenderTermGreaterThanCurrentTermButSenderLogLessUptoDate() {
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    RaftActorContext actorContext =
-                        createActorContext(behaviorActor);
-
-                    MockRaftActorContext.SimpleReplicatedLog
-                        log = new MockRaftActorContext.SimpleReplicatedLog();
-                    log.append(
-                        new MockRaftActorContext.MockReplicatedLogEntry(20000,
-                            1000000, new MockRaftActorContext.MockPayload("")));
-
-                    ((MockRaftActorContext) actorContext).setReplicatedLog(log);
-
-                    RaftActorBehavior behavior = createBehavior(actorContext);
-
-                    RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
-                        new RequestVote(1000, "test", 10000, 999));
-
-                    if(!(behavior instanceof Follower)){
-                        assertTrue(raftBehavior instanceof Follower);
-                    } else {
-                        final Boolean out =
-                            new ExpectMsg<Boolean>(duration("1 seconds"),
-                                "RequestVoteReply") {
-                                // do not put code outside this method, will run afterwards
-                                protected Boolean match(Object in) {
-                                    if (in instanceof RequestVoteReply) {
-                                        RequestVoteReply reply =
-                                            (RequestVoteReply) in;
-                                        return reply.isVoteGranted();
-                                    } else {
-                                        throw noMatch();
-                                    }
-                                }
-                            }.get();
-
-                        assertEquals(false, out);
-                    }
-                }
-            };
-        }};
+    public void testHandleRequestVoteWhenSenderLogLessUptoDate() {
+        MockRaftActorContext context = createActorContext();
+
+        behavior = createBehavior(context);
+
+        context.getTermInformation().update(1, "test");
+
+        int index = 2000;
+        setLastLogEntry(context, context.getTermInformation().getCurrentTerm(), index,
+                new MockRaftActorContext.MockPayload(""));
+
+        behavior.handleMessage(behaviorActor, new RequestVote(
+                context.getTermInformation().getCurrentTerm(), "test",
+                index - 1, context.getTermInformation().getCurrentTerm()));
+
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
+                RequestVoteReply.class);
+        assertEquals("isVoteGranted", false, reply.isVoteGranted());
     }
 
 
@@ -265,87 +206,81 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
      */
     @Test
     public void testHandleRequestVoteWhenSenderTermLessThanCurrentTerm() {
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    RaftActorContext context =
-                        createActorContext(behaviorActor);
-
-                    context.getTermInformation().update(1000, null);
-
-                    RaftActorBehavior follower = createBehavior(context);
-
-                    follower.handleMessage(getTestActor(),
-                        new RequestVote(999, "test", 10000, 999));
-
-                    final Boolean out =
-                        new ExpectMsg<Boolean>(duration("1 seconds"),
-                            "RequestVoteReply") {
-                            // do not put code outside this method, will run afterwards
-                            protected Boolean match(Object in) {
-                                if (in instanceof RequestVoteReply) {
-                                    RequestVoteReply reply =
-                                        (RequestVoteReply) in;
-                                    return reply.isVoteGranted();
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get();
-
-                    assertEquals(false, out);
-                }
-            };
-        }};
+        RaftActorContext context = createActorContext();
+
+        context.getTermInformation().update(1000, null);
+
+        behavior = createBehavior(context);
+
+        behavior.handleMessage(behaviorActor, new RequestVote(999, "test", 10000, 999));
+
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(behaviorActor,
+                RequestVoteReply.class);
+        assertEquals("isVoteGranted", false, reply.isVoteGranted());
     }
 
     @Test
-    public void testFakeSnapshots() {
+    public void testPerformSnapshot() {
         MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), behaviorActor);
-        AbstractRaftActorBehavior behavior = new Leader(context);
-        context.getTermInformation().update(1, "leader");
+        AbstractRaftActorBehavior abstractBehavior =  (AbstractRaftActorBehavior) createBehavior(context);
+        if (abstractBehavior instanceof Candidate) {
+            return;
+        }
+
+        context.getTermInformation().update(1, "test");
 
-        //entry with 1 index=0 entry with replicatedToAllIndex = 0, does not do anything, returns the
+        //log has 1 entry with replicatedToAllIndex = 0, does not do anything, returns the
         context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
         context.setLastApplied(0);
-        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(1, context.getReplicatedLog().size());
 
         //2 entries, lastApplied still 0, no purging.
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
         context.setLastApplied(0);
-        assertEquals(-1, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(-1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(2, context.getReplicatedLog().size());
 
         //2 entries, lastApplied still 0, no purging.
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,2,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
         context.setLastApplied(1);
-        assertEquals(0, behavior.fakeSnapshot(0, -1));
+        abstractBehavior.performSnapshotWithoutCapture(0);
+        assertEquals(0, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(1, context.getReplicatedLog().size());
 
         //5 entries, lastApplied =2 and replicatedIndex = 3, but since we want to keep the lastapplied, indices 0 and 1 will only get purged
-        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0,5,1).build());
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 5, 1).build());
         context.setLastApplied(2);
-        assertEquals(1, behavior.fakeSnapshot(3, 1));
+        abstractBehavior.performSnapshotWithoutCapture(3);
+        assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
         assertEquals(3, context.getReplicatedLog().size());
 
-
+        // scenario where Last applied > Replicated to all index (becoz of a slow follower)
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        context.setLastApplied(2);
+        abstractBehavior.performSnapshotWithoutCapture(1);
+        assertEquals(1, abstractBehavior.getReplicatedToAllIndex());
+        assertEquals(1, context.getReplicatedLog().size());
     }
 
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(
-        ActorRef actorRef, RaftRPC rpc) {
 
-        RaftActorContext actorContext = createActorContext();
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+            ActorRef actorRef, RaftRPC rpc) throws Exception {
+
         Payload p = new MockRaftActorContext.MockPayload("");
-        setLastLogEntry(
-            (MockRaftActorContext) actorContext, 0, 0, p);
+        setLastLogEntry((MockRaftActorContext) actorContext, 1, 0, p);
+        actorContext.getTermInformation().update(1, "test");
+
+        RaftActorBehavior origBehavior = createBehavior(actorContext);
+        RaftActorBehavior raftBehavior = origBehavior.handleMessage(actorRef, rpc);
 
-        RaftActorBehavior raftBehavior = createBehavior(actorContext)
-            .handleMessage(actorRef, rpc);
+        assertEquals("New raft state", RaftState.Follower, raftBehavior.state());
+        assertEquals("New election term", rpc.getTerm(), actorContext.getTermInformation().getCurrentTerm());
 
-        assertTrue(raftBehavior instanceof Follower);
+        origBehavior.close();
+        raftBehavior.close();
     }
 
     protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
@@ -354,10 +289,9 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             new MockRaftActorContext.MockReplicatedLogEntry(term, index, data));
     }
 
-    protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
-        MockRaftActorContext actorContext, ReplicatedLogEntry logEntry) {
-        MockRaftActorContext.SimpleReplicatedLog
-            log = new MockRaftActorContext.SimpleReplicatedLog();
+    protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(MockRaftActorContext actorContext,
+            ReplicatedLogEntry logEntry) {
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
         log.append(logEntry);
         actorContext.setReplicatedLog(log);
 
@@ -371,11 +305,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         return createBehavior(createActorContext());
     }
 
-    protected RaftActorContext createActorContext() {
+    protected MockRaftActorContext createActorContext() {
         return new MockRaftActorContext();
     }
 
-    protected RaftActorContext createActorContext(ActorRef actor) {
+    protected MockRaftActorContext createActorContext(ActorRef actor) {
         return new MockRaftActorContext("test", getSystem(), actor);
     }
 
@@ -398,4 +332,18 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
     protected Object fromSerializableMessage(Object serializable){
         return SerializationUtils.fromSerializable(serializable);
     }
+
+    protected ByteString toByteString(Map<String, String> state) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try(ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+            oos.writeObject(state);
+            return ByteString.copyFrom(bos.toByteArray());
+        } catch (IOException e) {
+            throw new AssertionError("IOException occurred converting Map to Bytestring", e);
+        }
+    }
+
+    protected void logStart(String name) {
+        LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
+    }
 }
index 0dc68c2..60f4552 100644 (file)
@@ -1,68 +1,48 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.junit.Assert;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-
-import static org.junit.Assert.assertEquals;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
-    private final ActorRef candidateActor = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
-
-    private final ActorRef peerActor1 = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
+    private final TestActorRef<MessageCollectorActor> candidateActor = actorFactory.createTestActor(
+            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("candidate"));
 
-    private final ActorRef peerActor2 = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
+    private TestActorRef<MessageCollectorActor>[] peerActors;
 
-    private final ActorRef peerActor3 = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
-
-    private final ActorRef peerActor4 = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
-
-    private final Map<String, String> onePeer = new HashMap<>();
-    private final Map<String, String> twoPeers = new HashMap<>();
-    private final Map<String, String> fourPeers = new HashMap<>();
+    private RaftActorBehavior candidate;
 
     @Before
     public void setUp(){
-        onePeer.put(peerActor1.path().toString(),
-            peerActor1.path().toString());
-
-        twoPeers.put(peerActor1.path().toString(),
-            peerActor1.path().toString());
-        twoPeers.put(peerActor2.path().toString(),
-            peerActor2.path().toString());
-
-        fourPeers.put(peerActor1.path().toString(),
-            peerActor1.path().toString());
-        fourPeers.put(peerActor2.path().toString(),
-            peerActor2.path().toString());
-        fourPeers.put(peerActor3.path().toString(),
-            peerActor3.path().toString());
-        fourPeers.put(peerActor4.path().toString(),
-            peerActor3.path().toString());
+    }
 
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if(candidate != null) {
+            candidate.close();
+        }
 
+        super.tearDown();
     }
 
     @Test
@@ -70,230 +50,167 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         RaftActorContext raftActorContext = createActorContext();
         long expectedTerm = raftActorContext.getTermInformation().getCurrentTerm();
 
-        new Candidate(raftActorContext);
+        candidate = new Candidate(raftActorContext);
 
-        assertEquals(expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
-        assertEquals(raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
+        assertEquals("getCurrentTerm", expectedTerm+1, raftActorContext.getTermInformation().getCurrentTerm());
+        assertEquals("getVotedFor", raftActorContext.getId(), raftActorContext.getTermInformation().getVotedFor());
     }
 
     @Test
     public void testThatAnElectionTimeoutIsTriggered(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
-                protected void run() {
-
-                    Candidate candidate = new Candidate(createActorContext(getTestActor()));
-
-                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof ElectionTimeout) {
-                                 return true;
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
-
-                    assertEquals(true, out);
-                }
-            };
-        }};
+         MockRaftActorContext actorContext = createActorContext();
+         candidate = new Candidate(actorContext);
+
+         MessageCollectorActor.expectFirstMatching(candidateActor, ElectionTimeout.class,
+                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
     }
 
     @Test
     public void testHandleElectionTimeoutWhenThereAreZeroPeers(){
         RaftActorContext raftActorContext = createActorContext();
-        Candidate candidate =
-            new Candidate(raftActorContext);
+        candidate = new Candidate(raftActorContext);
 
-        RaftActorBehavior raftBehavior =
+        RaftActorBehavior newBehavior =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
 
-        Assert.assertTrue(raftBehavior instanceof Leader);
+        assertEquals("Behavior", RaftState.Leader, newBehavior.state());
     }
 
     @Test
-    public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){
-        MockRaftActorContext raftActorContext =
-            (MockRaftActorContext) createActorContext();
-        raftActorContext.setPeerAddresses(onePeer);
-        Candidate candidate =
-            new Candidate(raftActorContext);
-
-        RaftActorBehavior raftBehavior =
-            candidate.handleMessage(candidateActor, new ElectionTimeout());
+    public void testHandleElectionTimeoutWhenThereAreTwoNodeCluster(){
+        MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.setPeerAddresses(setupPeers(1));
+        candidate = new Candidate(raftActorContext);
+
+        candidate = candidate.handleMessage(candidateActor, new ElectionTimeout());
 
-        Assert.assertTrue(raftBehavior instanceof Candidate);
+        assertEquals("Behavior", RaftState.Candidate, candidate.state());
     }
 
     @Test
-    public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){
-        MockRaftActorContext raftActorContext =
-            (MockRaftActorContext) createActorContext();
-        raftActorContext.setPeerAddresses(twoPeers);
-        Candidate candidate =
-            new Candidate(raftActorContext);
-
-        RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+    public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
+        MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.setPeerAddresses(setupPeers(2));
+        candidate = new Candidate(raftActorContext);
 
-        Assert.assertTrue(behaviorOnFirstVote instanceof Leader);
+        candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, true));
 
+        assertEquals("Behavior", RaftState.Leader, candidate.state());
     }
 
     @Test
-    public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){
-        MockRaftActorContext raftActorContext =
-            (MockRaftActorContext) createActorContext();
-        raftActorContext.setPeerAddresses(fourPeers);
-        Candidate candidate =
-            new Candidate(raftActorContext);
+    public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodeCluster(){
+        MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.setPeerAddresses(setupPeers(4));
+        candidate = new Candidate(raftActorContext);
 
-        RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+        // First peers denies the vote.
+        candidate = candidate.handleMessage(peerActors[0], new RequestVoteReply(1, false));
 
-        RaftActorBehavior behaviorOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
+        assertEquals("Behavior", RaftState.Candidate, candidate.state());
 
-        Assert.assertTrue(behaviorOnFirstVote instanceof Candidate);
-        Assert.assertTrue(behaviorOnSecondVote instanceof Leader);
+        candidate = candidate.handleMessage(peerActors[1], new RequestVoteReply(1, true));
 
-    }
+        assertEquals("Behavior", RaftState.Candidate, candidate.state());
 
-    @Test
-    public void testResponseToAppendEntriesWithLowerTerm(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    Candidate candidate = new Candidate(createActorContext(getTestActor()));
-
-                    candidate.handleMessage(getTestActor(), new AppendEntries(0, "test", 0,0,Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof AppendEntriesReply) {
-                                AppendEntriesReply reply = (AppendEntriesReply) in;
-                                return reply.isSuccess();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
-
-                    assertEquals(false, out);
-                }
-            };
-        }};
+        candidate = candidate.handleMessage(peerActors[2], new RequestVoteReply(1, true));
+
+        assertEquals("Behavior", RaftState.Leader, candidate.state());
     }
 
     @Test
-    public void testResponseToRequestVoteWithLowerTerm(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    Candidate candidate = new Candidate(createActorContext(getTestActor()));
-
-                    candidate.handleMessage(getTestActor(), new RequestVote(0, "test", 0, 0));
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
-
-                    assertEquals(false, out);
-                }
-            };
-        }};
+    public void testResponseToHandleAppendEntriesWithLowerTerm() {
+        candidate = new Candidate(createActorContext());
+
+        setupPeers(1);
+        candidate.handleMessage(peerActors[0], new AppendEntries(1, "test", 0, 0,
+                Collections.<ReplicatedLogEntry>emptyList(), 0, -1));
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(
+                peerActors[0], AppendEntriesReply.class);
+        assertEquals("isSuccess", false, reply.isSuccess());
+        assertEquals("getTerm", 2, reply.getTerm());
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                protected void run() {
-
-                    RaftActorContext context = createActorContext(getTestActor());
-
-                    context.getTermInformation().update(1000, null);
-
-                    // Once a candidate is created it will immediately increment the current term so after
-                    // construction the currentTerm should be 1001
-                    RaftActorBehavior follower = createBehavior(context);
-
-                    follower.handleMessage(getTestActor(), new RequestVote(1001, "test", 10000, 999));
-
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
-
-                    assertEquals(true, out);
-                }
-            };
-        }};
+    public void testResponseToRequestVoteWithLowerTerm() {
+        candidate = new Candidate(createActorContext());
+
+        setupPeers(1);
+        candidate.handleMessage(peerActors[0], new RequestVote(1, "test", 0, 0));
+
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(
+                peerActors[0], RequestVoteReply.class);
+        assertEquals("isVoteGranted", false, reply.isVoteGranted());
+        assertEquals("getTerm", 2, reply.getTerm());
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
-        new JavaTestKit(getSystem()) {{
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForMatches() {
+        MockRaftActorContext context = createActorContext();
+        context.getTermInformation().update(1000, null);
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
+        // Once a candidate is created it will immediately increment the current term so after
+        // construction the currentTerm should be 1001
+        candidate = new Candidate(context);
 
-                    RaftActorContext context = createActorContext(getTestActor());
+        setupPeers(1);
+        candidate.handleMessage(peerActors[0], new RequestVote(1001, context.getId(), 10000, 999));
 
-                    context.getTermInformation().update(1000, "test");
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(
+                peerActors[0], RequestVoteReply.class);
+        assertEquals("isVoteGranted", true, reply.isVoteGranted());
+        assertEquals("getTerm", 1001, reply.getTerm());
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForDoesNotMatch() {
+        MockRaftActorContext context = createActorContext();
+        context.getTermInformation().update(1000, null);
 
-                    RaftActorBehavior follower = createBehavior(context);
+        // Once a candidate is created it will immediately increment the current term so after
+        // construction the currentTerm should be 1001
+        candidate = new Candidate(context);
 
-                    follower.handleMessage(getTestActor(), new RequestVote(1001, "candidate", 10000, 999));
+        setupPeers(1);
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
+        // RequestVote candidate ID ("candidate2") does not match this candidate's votedFor
+        // (it votes for itself)
+        candidate.handleMessage(peerActors[0], new RequestVote(1001, "candidate2", 10000, 999));
 
-                    assertEquals(false, out);
-                }
-            };
-        }};
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(
+                peerActors[0], RequestVoteReply.class);
+        assertEquals("isVoteGranted", false, reply.isVoteGranted());
+        assertEquals("getTerm", 1001, reply.getTerm());
     }
 
 
 
-    @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
+    @Override
+    protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
         return new Candidate(actorContext);
     }
 
-    @Override protected RaftActorContext createActorContext() {
-        return new MockRaftActorContext("test", getSystem(), candidateActor);
+    @Override protected MockRaftActorContext createActorContext() {
+        return new MockRaftActorContext("candidate", getSystem(), candidateActor);
     }
 
+    private Map<String, String> setupPeers(int count) {
+        Map<String, String> peerMap = new HashMap<>();
+        peerActors = new TestActorRef[count];
+        for(int i = 0; i < count; i++) {
+            peerActors[i] = actorFactory.createTestActor(Props.create(MessageCollectorActor.class),
+                    actorFactory.generateActorId("peer"));
+            peerMap.put("peer" + (i+1), peerActors[i].path().toString());
+        }
 
+        return peerMap;
+    }
+
+    @Override
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+            ActorRef actorRef, RaftRPC rpc) throws Exception {
<