Reduce use of scala.concurrent.duration.Duration
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index c3cd8ef80fdc04b91879356030c70fe2c30ee237..334bd8e2d2cc43de963999feb088dd872d6b5c11 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
@@ -107,7 +106,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -395,9 +393,7 @@ public class Shard extends RaftActor {
                         responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
                             .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
                             .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
-                            .onFailureCallback(t -> {
-                                LOG.warn("Error slicing response {}", success, t);
-                            }).build()));
+                            .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
                 } else {
                     envelope.sendSuccess(success, executionTimeNanos);
                 }
@@ -474,7 +470,8 @@ public class Shard extends RaftActor {
             }
             if (cmp > 0) {
                 LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
-                throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+                throw new RetiredGenerationException(clientId.getGeneration(),
+                    existing.getIdentifier().getGeneration());
             }
 
             LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
@@ -789,9 +786,16 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
-        store.closeTransactionChain(id, null);
-        store.purgeTransactionChain(id, null);
+        if (isLeader()) {
+            final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+            // FIXME: CONTROLLER-1628: stage purge once no transactions are present
+            store.closeTransactionChain(id, null);
+            store.purgeTransactionChain(id, null);
+        } else if (getLeader() != null) {
+            getLeader().forward(closeTransactionChain, getContext());
+        } else {
+            LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier());
+        }
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
@@ -859,7 +863,7 @@ public class Shard extends RaftActor {
         if (txCommitTimeoutCheckSchedule == null) {
             // Schedule a message to be periodically sent to check if the current in-progress
             // transaction should be expired and aborted.
-            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            FiniteDuration period = FiniteDuration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
             txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
                     period, period, getSelf(),
                     TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());