BUG-4964: Bump to akka-2.4.1 24/32524/19
authorRobert Varga <robert.varga@pantheon.sk>
Wed, 13 Jan 2016 20:42:04 +0000 (21:42 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 7 Mar 2016 12:11:05 +0000 (12:11 +0000)
This updates the imports and adapts to API changes. Requires Java 8 to
work. Also bumps netty to 3.10.5, as that is what remoting requires.

Change-Id: I3a3886ac75496d07ec03ae561a6df03ecaa5ad0c
Signed-off-by: Robert Varga <robert.varga@pantheon.sk>
20 files changed:
features/akka/pom.xml [deleted file]
features/akka/src/main/features/features.xml [deleted file]
features/mdsal/pom.xml
features/mdsal/src/main/features/features.xml
features/pom.xml
opendaylight/commons/opendaylight/pom.xml
opendaylight/md-sal/sal-akka-raft/pom.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-dummy-distributed-datastore/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/pom.xml

diff --git a/features/akka/pom.xml b/features/akka/pom.xml
deleted file mode 100644 (file)
index b6d4501..0000000
+++ /dev/null
@@ -1,135 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
-
- This program and the accompanying materials are made available under the
- terms of the Eclipse Public License v1.0 which accompanies this distribution,
- and is available at http://www.eclipse.org/legal/epl-v10.html
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-   <modelVersion>4.0.0</modelVersion>
-   <parent>
-    <groupId>org.opendaylight.odlparent</groupId>
-    <artifactId>features-parent</artifactId>
-    <version>1.7.0-SNAPSHOT</version>
-    <relativePath/>
-   </parent>
-   <artifactId>features-akka</artifactId>
-   <groupId>org.opendaylight.controller</groupId>
-   <version>1.7.0-SNAPSHOT</version>
-   <packaging>pom</packaging>
-   <properties>
-      <features.file>features.xml</features.file>
-      <branding.version>1.3.0-SNAPSHOT</branding.version>
-      <karaf.resources.version>1.7.0-SNAPSHOT</karaf.resources.version>
-      <feature.test.version>0.9.0-SNAPSHOT</feature.test.version>
-      <karaf.empty.version>1.7.0-SNAPSHOT</karaf.empty.version>
-      <surefire.version>2.16</surefire.version>
-      <akka.version>2.3.14</akka.version>
-      <scala.version>2.11</scala.version>
-   </properties>
-    <dependencyManagement>
-        <dependencies>
-            <dependency>
-                <groupId>org.opendaylight.controller</groupId>
-                <artifactId>commons.opendaylight</artifactId>
-                <version>1.7.0-SNAPSHOT</version>
-                <type>pom</type>
-                <scope>import</scope>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
-   <dependencies>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-reflect</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe</groupId>
-      <artifactId>config</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-actor_${scala.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-slf4j_${scala.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-osgi_${scala.version}</artifactId>
-    </dependency>
-    <dependency>
-        <groupId>org.uncommons.maths</groupId>
-        <artifactId>uncommons-maths</artifactId>
-        <exclusions>
-            <exclusion>
-                <groupId>jfree</groupId>
-                <artifactId>jcommon</artifactId>
-            </exclusion>
-            <exclusion>
-                <groupId>jfree</groupId>
-                <artifactId>jfreechart</artifactId>
-            </exclusion>
-        </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-      <version>3.9.9.Final</version>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-remote_${scala.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-cluster_${scala.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.iq80.leveldb</groupId>
-      <artifactId>leveldb</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.fusesource.leveldbjni</groupId>
-      <artifactId>leveldbjni-all</artifactId>
-    </dependency>
-    <!-- test to validate features.xml -->
-    <dependency>
-      <groupId>org.opendaylight.odlparent</groupId>
-      <artifactId>features-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <!-- dependency for opendaylight-karaf-empty for use by testing -->
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>opendaylight-karaf-empty</artifactId>
-      <version>${karaf.empty.version}</version>
-      <scope>test</scope>
-      <type>zip</type>
-    </dependency>
-    <!-- Uncomment this if you get an error : java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Lorg/slf4j/helpers/FormattingTuple;
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-simple</artifactId>
-      <version>1.7.2</version>
-    </dependency>
-    -->
-
-   </dependencies>
-   <scm>
-      <connection>scm:git:http://git.opendaylight.org/gerrit/controller.git</connection>
-      <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
-      <tag>HEAD</tag>
-      <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
-   </scm>
-</project>
diff --git a/features/akka/src/main/features/features.xml b/features/akka/src/main/features/features.xml
deleted file mode 100644 (file)
index f0f004e..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- vi: set et smarttab sw=4 tabstop=4: -->
-<!--
- Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
-
- This program and the accompanying materials are made available under the
- terms of the Eclipse Public License v1.0 which accompanies this distribution,
- and is available at http://www.eclipse.org/legal/epl-v10.html
--->
-<features name="odl-controller-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-          xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
-    <feature name='odl-akka-all' version='${project.version}' description='OpenDaylight :: Akka :: All'>
-        <feature version="${scala.version}">odl-akka-scala</feature>
-        <feature version="${akka.version}">odl-akka-system</feature>
-        <feature version="${akka.version}">odl-akka-clustering</feature>
-        <feature version='0.7'>odl-akka-leveldb</feature>
-        <feature version="${akka.version}">odl-akka-persistence</feature>
-    </feature>
-    <feature name="odl-akka-scala" description="Scala Runtime for OpenDaylight" version="${scala.version}">
-        <bundle>mvn:org.scala-lang/scala-library/{{VERSION}}</bundle>
-        <bundle>mvn:org.scala-lang/scala-reflect/{{VERSION}}</bundle>
-    </feature>
-    <feature name="odl-akka-system" description="Akka Actor Framework System Bundles" version="${akka.version}">
-        <feature version="${scala.version}">odl-akka-scala</feature>
-        <bundle>mvn:com.typesafe/config/{{VERSION}}</bundle>
-        <bundle>mvn:com.typesafe.akka/akka-actor_${scala.version}/${akka.version}</bundle>
-        <bundle>mvn:com.typesafe.akka/akka-slf4j_${scala.version}/${akka.version}</bundle>
-        <bundle>mvn:com.typesafe.akka/akka-osgi_${scala.version}/${akka.version}</bundle>
-    </feature>
-    <feature name="odl-akka-clustering" description="Akka Clustering" version="${akka.version}">
-        <feature version="${akka.version}">odl-akka-system</feature>
-        <bundle>wrap:mvn:org.uncommons.maths/uncommons-maths/{{VERSION}}</bundle>
-        <bundle>mvn:com.google.protobuf/protobuf-java/{{VERSION}}</bundle>
-        <bundle>mvn:io.netty/netty/{{VERSION}}</bundle>
-        <bundle>mvn:com.typesafe.akka/akka-remote_${scala.version}/${akka.version}</bundle>
-        <bundle>mvn:com.typesafe.akka/akka-cluster_${scala.version}/${akka.version}</bundle>
-    </feature>
-    <feature name='odl-akka-leveldb' description='LevelDB' version='0.7'>
-        <bundle>wrap:mvn:org.iq80.leveldb/leveldb/{{VERSION}}</bundle>
-        <bundle>mvn:org.fusesource.leveldbjni/leveldbjni-all/{{VERSION}}</bundle>
-    </feature>
-    <feature name='odl-akka-persistence' description='Akka Persistence' version="${akka.version}">
-        <feature version='0.7'>odl-akka-leveldb</feature>
-        <feature version="${akka.version}">odl-akka-system</feature>
-        <bundle>mvn:com.typesafe.akka/akka-persistence-experimental_${scala.version}/${akka.version}</bundle>
-        <bundle>wrap:mvn:com.google.protobuf/protobuf-java/{{VERSION}}$overwrite=merge&amp;DynamicImport-Package=org.opendaylight.controller.protobuff.messages.*;org.opendaylight.controller.cluster.raft.protobuff.client.messages.*</bundle>
-    </feature>
-</features>
index f609923..5d31ea2 100644 (file)
@@ -21,7 +21,8 @@
     <controller.mdsal.version>1.4.0-SNAPSHOT</controller.mdsal.version>
     <config.version>0.5.0-SNAPSHOT</config.version>
     <commons.opendaylight.version>1.7.0-SNAPSHOT</commons.opendaylight.version>
-    <akka.version>2.3.14</akka.version>
+    <akka.features.version>1.7.0-SNAPSHOT</akka.features.version>
+    <akka.version>2.4.1</akka.version>
     <features.file>features.xml</features.file>
     <config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
     <config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
       <type>xml</type>
     </dependency>
     <dependency>
-      <groupId>org.opendaylight.controller</groupId>
+      <groupId>org.opendaylight.odlparent</groupId>
       <artifactId>features-akka</artifactId>
-      <version>${commons.opendaylight.version}</version>
+      <version>${akka.features.version}</version>
       <classifier>features</classifier>
       <type>xml</type>
     </dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-distributed-datastore</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-dom-broker-config</artifactId>
+    </dependency>
 
     <!-- message-bus -->
     <dependency>
index 0bbc06b..0dadd3f 100644 (file)
@@ -13,7 +13,7 @@
     <repository>mvn:org.opendaylight.controller/features-config/{{VERSION}}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-persister/{{VERSION}}/xml/features</repository>
     <repository>mvn:org.opendaylight.controller/features-config-netty/{{VERSION}}/xml/features</repository>
-    <repository>mvn:org.opendaylight.controller/features-akka/{{VERSION}}/xml/features</repository>
+    <repository>mvn:org.opendaylight.odlparent/features-akka/{{VERSION}}/xml/features</repository>
     <feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <feature version='${project.version}'>odl-mdsal-broker-local</feature>
index 6300546..b580e8e 100644 (file)
@@ -16,7 +16,6 @@
     <module>config-netty</module>
     <module>mdsal</module>
     <module>protocol-framework</module>
-    <module>akka</module>
     <module>extras</module>
     <module>benchmark</module>
   </modules>
index 23033f5..35b333e 100644 (file)
@@ -16,7 +16,6 @@
   <properties>
     <mdsal.version>2.1.0-SNAPSHOT</mdsal.version>
     <mdsal.model.version>0.9.0-SNAPSHOT</mdsal.model.version>
-    <akka.version>2.3.14</akka.version>
     <appauth.version>0.7.0-SNAPSHOT</appauth.version>
     <archetype-app-northbound>0.3.0-SNAPSHOT</archetype-app-northbound>
     <arphandler.version>0.8.0-SNAPSHOT</arphandler.version>
@@ -91,8 +90,6 @@
     <jolokia-bridge.version>0.3.0-SNAPSHOT</jolokia-bridge.version>
     <karaf.branding.version>1.3.0-SNAPSHOT</karaf.branding.version>
     <karaf.shell.version>${karaf.version}</karaf.shell.version>
-    <leveldb.version>0.7</leveldb.version>
-    <leveldbjni.version>1.8-odl</leveldbjni.version>
     <lifecycle.mapping.version>1.0.0</lifecycle.mapping.version>
     <logging.bridge.version>0.7.0-SNAPSHOT</logging.bridge.version>
     <maven.plugin.api.version>3.0.5</maven.plugin.api.version>
     <northbound.jolokia.version>1.7.0-SNAPSHOT</northbound.jolokia.version>
     <opendaylight-l2-types.version>2013.08.27.9-SNAPSHOT</opendaylight-l2-types.version>
     <osgi-brandfragment.web.version>0.3.0-SNAPSHOT</osgi-brandfragment.web.version>
-    <protobuf.version>2.5.0</protobuf.version>
     <protocol-framework.version>0.8.0-SNAPSHOT</protocol-framework.version>
     <protocol_plugins.openflow.version>0.7.0-SNAPSHOT</protocol_plugins.openflow.version>
     <protocol_plugins.stub.version>0.7.0-SNAPSHOT</protocol_plugins.stub.version>
     <samples.loadbalancer.northbound.version>0.7.0-SNAPSHOT</samples.loadbalancer.northbound.version>
     <samples.simpleforwarding.version>0.7.0-SNAPSHOT</samples.simpleforwarding.version>
     <sanitytest.version>0.7.0-SNAPSHOT</sanitytest.version>
-    <scala.version>2.11</scala.version>
-    <scala.micro.version>7</scala.micro.version>
     <security.version>0.7.0-SNAPSHOT</security.version>
     <karaf.security.version>0.7.0-SNAPSHOT</karaf.security.version>
     <sitedeploy>dav:http://nexus.opendaylight.org/content/sites/site</sitedeploy>
     <topologymanager.version>0.7.0-SNAPSHOT</topologymanager.version>
     <topologymanager.shell.version>1.3.0-SNAPSHOT</topologymanager.shell.version>
     <troubleshoot.web.version>0.7.0-SNAPSHOT</troubleshoot.web.version>
-    <uncommons.maths.version>1.2.2a</uncommons.maths.version>
     <usermanager.implementation.version>0.7.0-SNAPSHOT</usermanager.implementation.version>
     <usermanager.northbound.version>0.3.0-SNAPSHOT</usermanager.northbound.version>
     <usermanager.version>0.7.0-SNAPSHOT</usermanager.version>
         <scope>provided</scope>
       </dependency>
 
-      <dependency>
-        <groupId>com.typesafe.akka</groupId>
-        <artifactId>akka-actor_${scala.version}</artifactId>
-        <version>${akka.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>com.typesafe.akka</groupId>
-        <artifactId>akka-cluster_${scala.version}</artifactId>
-        <version>${akka.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>com.typesafe.akka</groupId>
-        <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
-        <version>${akka.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>com.typesafe.akka</groupId>
-        <artifactId>akka-remote_${scala.version}</artifactId>
-        <version>${akka.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>com.typesafe.akka</groupId>
-        <artifactId>akka-testkit_${scala.version}</artifactId>
-        <version>${akka.version}</version>
-      </dependency>
-        <dependency>
-            <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-osgi_${scala.version}</artifactId>
-            <version>${akka.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.typesafe.akka</groupId>
-            <artifactId>akka-slf4j_${scala.version}</artifactId>
-            <version>${akka.version}</version>
-        </dependency>
       <dependency>
         <groupId>net.sourceforge.pmd</groupId>
         <artifactId>pmd</artifactId>
         <artifactId>gmaven-runtime-2.0</artifactId>
         <version>1.5</version>
       </dependency>
-      <dependency>
-        <groupId>org.uncommons.maths</groupId>
-        <artifactId>uncommons-maths</artifactId>
-        <version>${uncommons.maths.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>com.google.protobuf</groupId>
-        <artifactId>protobuf-java</artifactId>
-        <version>${protobuf.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.iq80.leveldb</groupId>
-        <artifactId>leveldb</artifactId>
-        <version>${leveldb.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.fusesource.leveldbjni</groupId>
-        <artifactId>leveldbjni-all</artifactId>
-        <version>${leveldbjni.version}</version>
-      </dependency>
 
       <!-- md-sal -->
       <dependency>
         <artifactId>reflections</artifactId>
         <version>0.9.9-RC1</version>
       </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-library</artifactId>
-        <version>${scala.version}.${scala.micro.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-reflect</artifactId>
-        <version>${scala.version}.${scala.micro.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>commons.logback_settings</artifactId>
index 7049509..6e73dde 100644 (file)
@@ -31,7 +31,7 @@
 
     <dependency>
       <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+      <artifactId>akka-persistence_${scala.version}</artifactId>
     </dependency>
 
     <dependency>
index d4f6247..923e1f1 100644 (file)
@@ -95,7 +95,7 @@ class RaftActorRecoverySupport {
 
                 // Delete all the akka snapshots as they will not be needed
                 persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
-                        scala.Long.MaxValue()));
+                        scala.Long.MaxValue(), 0L, 0L));
 
                 // Since we cleaned out the journal, we need to re-write the current election info.
                 context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
index cffd422..197fa86 100644 (file)
@@ -407,7 +407,7 @@ public class SnapshotManager implements SnapshotState {
             }
 
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
-                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE));
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
index 624369b..d37681e 100644 (file)
@@ -365,11 +365,11 @@ public class RaftActorTest extends AbstractActorTest {
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
-        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
+        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotSuccess);
 
-        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
+        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotFailure);
 
index 3189499..7909528 100644 (file)
@@ -8,9 +8,7 @@
 package org.opendaylight.controller.cluster.raft.utils;
 
 import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
+import akka.persistence.AtomicWrite;
 import akka.persistence.PersistentImpl;
 import akka.persistence.PersistentRepr;
 import akka.persistence.journal.japi.AsyncWriteJournal;
@@ -22,10 +20,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.apache.commons.lang.SerializationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -148,7 +148,9 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
-            final long toSequenceNr, final long max, final Procedure<PersistentRepr> replayCallback) {
+            final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+        LOG.trace("doAsyncReplayMessages for {}: fromSequenceNr: {}, toSequenceNr: {}", persistenceId,
+                fromSequenceNr,toSequenceNr);
         return Futures.future(new Callable<Void>() {
             @Override
             public Void call() throws Exception {
@@ -168,8 +170,8 @@ public class InMemoryJournal extends AsyncWriteJournal {
                         if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) {
                             PersistentRepr persistentMessage =
                                     new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId,
-                                            false, null, null);
-                            replayCallback.apply(persistentMessage);
+                                            null, false, null, null);
+                            replayCallback.accept(persistentMessage);
                         }
                     }
                 }
@@ -181,6 +183,8 @@ public class InMemoryJournal extends AsyncWriteJournal {
 
     @Override
     public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
+        LOG.trace("doAsyncReadHighestSequenceNr for {}: fromSequenceNr: {}", persistenceId, fromSequenceNr);
+
         // Akka calls this during recovery.
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal == null) {
@@ -200,41 +204,36 @@ public class InMemoryJournal extends AsyncWriteJournal {
     }
 
     @Override
-    public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
-        return Futures.future(new Callable<Void>() {
+    public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
+        return Futures.future(new Callable<Iterable<Optional<Exception>>>() {
             @Override
-            public Void call() throws Exception {
-                for (PersistentRepr repr : messages) {
-                    LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+            public Iterable<Optional<Exception>> call() throws Exception {
+                for (AtomicWrite write : messages) {
+                    // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc
+                    PersistentRepr[] array = new PersistentRepr[write.payload().size()];
+                    write.payload().copyToArray(array);
+                    for(PersistentRepr repr: array) {
+                        LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
                             repr.sequenceNr(), repr.payload());
 
-                    addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
+                        addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
 
-                    WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
-                    if(complete != null) {
-                        if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
-                            complete.latch.countDown();
+                        WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
+                        if(complete != null) {
+                            if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
+                                complete.latch.countDown();
+                            }
                         }
                     }
                 }
 
-                return null;
+                return Collections.emptyList();
             }
         }, context().dispatcher());
     }
 
     @Override
-    public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
-        return Futures.successful(null);
-    }
-
-    @Override
-    public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
-        return Futures.successful(null);
-    }
-
-    @Override
-    public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+    public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
         LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
         Map<Long, Object> journal = journals.get(persistenceId);
         if(journal != null) {
index bf13089..a7e751c 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.raft.utils;
 
 import akka.dispatch.Futures;
-import akka.japi.Option;
 import akka.persistence.SelectedSnapshot;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotSelectionCriteria;
@@ -21,6 +20,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -102,24 +102,24 @@ public class InMemorySnapshotStore extends SnapshotStore {
     }
 
     @Override
-    public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
+    public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId,
             SnapshotSelectionCriteria snapshotSelectionCriteria) {
         List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
         if(snapshotList == null){
-            return Futures.successful(Option.<SelectedSnapshot>none());
+            return Futures.successful(Optional.<SelectedSnapshot>empty());
         }
 
         synchronized(snapshotList) {
             for(int i = snapshotList.size() - 1; i >= 0; i--) {
                 StoredSnapshot snapshot = snapshotList.get(i);
                 if(matches(snapshot, snapshotSelectionCriteria)) {
-                    return Futures.successful(Option.some(new SelectedSnapshot(snapshot.metadata,
+                    return Futures.successful(Optional.of(new SelectedSnapshot(snapshot.metadata,
                             snapshot.data)));
                 }
             }
         }
 
-        return Futures.successful(Option.<SelectedSnapshot>none());
+        return Futures.successful(Optional.<SelectedSnapshot>empty());
     }
 
     private static boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
@@ -151,41 +151,36 @@ public class InMemorySnapshotStore extends SnapshotStore {
     }
 
     @Override
-    public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
-    }
+    public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
+        List<StoredSnapshot> snapshotList = snapshots.get(metadata.persistenceId());
 
-    @Override
-    public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
-        List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
-
-        if(snapshotList == null){
-            return;
-        }
-
-        synchronized (snapshotList) {
-            for(int i=0;i<snapshotList.size(); i++){
-                StoredSnapshot snapshot = snapshotList.get(i);
-                if(snapshotMetadata.equals(snapshot.metadata)){
-                    snapshotList.remove(i);
-                    break;
+        if (snapshotList != null) {
+            synchronized (snapshotList) {
+                for(int i=0;i<snapshotList.size(); i++){
+                    StoredSnapshot snapshot = snapshotList.get(i);
+                    if(metadata.equals(snapshot.metadata)){
+                        snapshotList.remove(i);
+                        break;
+                    }
                 }
             }
         }
+
+        return Futures.successful(null);
     }
 
     @Override
-    public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
-            throws Exception {
-        LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistentId,
-                snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
+    public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+        LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistenceId,
+            criteria.maxSequenceNr(), criteria.maxTimestamp());
 
-        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+        List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
         if(snapshotList != null){
             synchronized (snapshotList) {
                 Iterator<StoredSnapshot> iter = snapshotList.iterator();
                 while(iter.hasNext()) {
                     StoredSnapshot s = iter.next();
-                    if(matches(s, snapshotSelectionCriteria)) {
+                    if(matches(s, criteria)) {
                         LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}",
                                 s.metadata.sequenceNr(), s.metadata.timestamp(), s.data);
 
@@ -195,10 +190,12 @@ public class InMemorySnapshotStore extends SnapshotStore {
             }
         }
 
-        CountDownLatch latch = snapshotDeletedLatches.get(persistentId);
+        CountDownLatch latch = snapshotDeletedLatches.get(persistenceId);
         if(latch != null) {
             latch.countDown();
         }
+
+        return Futures.successful(null);
     }
 
     private static class StoredSnapshot {
index 5e2cb6e..431f3be 100644 (file)
@@ -55,7 +55,7 @@
     </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+      <artifactId>akka-persistence_${scala.version}</artifactId>
     </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
index eb4903e..9d80299 100644 (file)
@@ -34,6 +34,7 @@ odl-cluster-data {
     logger-startup-timeout = 300s
 
     actor {
+      warn-about-java-serializer-usage = off
       provider = "akka.cluster.ClusterActorRefProvider"
       serializers {
         java = "akka.serialization.JavaSerializer"
@@ -81,5 +82,10 @@ odl-cluster-data {
       ]
 
     }
+
+    persistence {
+      journal.plugin = akka.persistence.journal.leveldb
+      snapshot-store.plugin = akka.persistence.snapshot-store.local
+    }
   }
 }
index f21aa5a..935af23 100644 (file)
@@ -47,7 +47,7 @@
     </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+      <artifactId>akka-persistence_${scala.version}</artifactId>
     </dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
index 1c0a5b5..439c2ba 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import akka.actor.Props;
-import akka.persistence.RecoveryFailure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
@@ -195,23 +194,12 @@ public class Shard extends RaftActor {
 
     @Override
     public void onReceiveRecover(final Object message) throws Exception {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
-                message.getClass().toString(), getSender());
-        }
-
-        if (message instanceof RecoveryFailure){
-            LOG.error("{}: Recovery failed because of this cause",
-                    persistenceId(), ((RecoveryFailure) message).cause());
+        LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
+            getSender());
 
-            // Even though recovery failed, we still need to finish our recovery, eg send the
-            // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
-            onRecoveryComplete();
-        } else {
-            super.onReceiveRecover(message);
-            if(LOG.isTraceEnabled()) {
-                appendEntriesReplyTracker.begin();
-            }
+        super.onReceiveRecover(message);
+        if (LOG.isTraceEnabled()) {
+            appendEntriesReplyTracker.begin();
         }
     }
 
index 8ef3f6f..aec94ec 100644 (file)
@@ -1275,7 +1275,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
             persistenceId());
-        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
+            0, 0));
     }
 
     private static class ForwardedAddServerReply {
index 084dc82..7b78da2 100644 (file)
@@ -497,6 +497,8 @@ public class DistributedDataStoreIntegrationTest {
             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
 
+            InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
+
             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
 
             // Create the write Tx
@@ -567,6 +569,8 @@ public class DistributedDataStoreIntegrationTest {
             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
 
+            InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
+
             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
 
             // Create the read-write Tx
index 7ac2154..0ea13fa 100644 (file)
@@ -27,7 +27,7 @@
 
     <dependency>
       <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+      <artifactId>akka-persistence_${scala.version}</artifactId>
     </dependency>
 
     <dependency>
index b89c04f..8958dd8 100644 (file)
       <artifactId>akka-osgi_${scala.version}</artifactId>
     </dependency>
 
-  <dependency>
-     <groupId>com.typesafe.akka</groupId>
-     <artifactId>akka-slf4j_${scala.version}</artifactId>
-  </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_${scala.version}</artifactId>
+    </dependency>
 
-      <dependency>
-          <groupId>com.typesafe.akka</groupId>
-          <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
-      </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-persistence_${scala.version}</artifactId>
+    </dependency>
     <!-- SAL Dependencies -->
 
     <dependency>