From: Ravikumar Chiguruvada Date: Tue, 8 Apr 2014 10:34:56 +0000 (+0530) Subject: Upgrading clustermanager to use infinispan 6.0.2Final. X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~247^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=eed877b90a0f8ed70150d710a9479e5a11409259 Upgrading clustermanager to use infinispan 6.0.2Final. - Removed unnecessary BaseInterceptor clone which was needed for 5.3.0 version - Bumped services_implementation implementation version to indicate this is actually different. - Modified the parent pom version so version propagate in other places Signed-off-by: Ravikumar Chiguruvada Signed-off-by: Giovanni Meo Change-Id: I062c72153a2ffe6949f3a3c8d2f0cdaa826bb65c --- diff --git a/opendaylight/clustering/services_implementation/pom.xml b/opendaylight/clustering/services_implementation/pom.xml index d7a3db3841..1b72630b8c 100644 --- a/opendaylight/clustering/services_implementation/pom.xml +++ b/opendaylight/clustering/services_implementation/pom.xml @@ -15,7 +15,7 @@ clustering.services-implementation - 0.4.2-SNAPSHOT + 0.4.3-SNAPSHOT bundle @@ -68,7 +68,7 @@ * - infinispan-core,jgroups,jboss-marshalling-river,jboss-marshalling,jboss-logging,staxmapper,narayana-jta;type=!pom;inline=false + infinispan-core,infinispan-commons,jgroups,jboss-marshalling-river,jboss-marshalling,jboss-logging,staxmapper,narayana-jta;type=!pom;inline=false true @@ -116,7 +116,7 @@ org.infinispan:infinispan-core:* - org.infinispan:infinispan-core:[5.3.0.Final] + org.infinispan:infinispan-core:[6.0.2.Final] @@ -131,7 +131,7 @@ org.infinispan infinispan-core - 5.3.0.Final + 6.0.2.Final org.opendaylight.controller diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java deleted file mode 100644 index d495e454a6..0000000000 --- a/opendaylight/clustering/services_implementation/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * Copyright 2009 Red Hat Inc. and/or its affiliates and other - * contributors as indicated by the @author tags. All rights reserved. - * See the copyright.txt in the distribution for a full listing of - * individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ -package org.infinispan.interceptors.distribution; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.infinispan.CacheException; -import org.infinispan.commands.FlagAffectedCommand; -import org.infinispan.commands.remote.ClusteredGetCommand; -import org.infinispan.commands.write.DataWriteCommand; -import org.infinispan.commands.write.WriteCommand; -import org.infinispan.container.entries.InternalCacheEntry; -import org.infinispan.container.entries.InternalCacheValue; -import org.infinispan.context.InvocationContext; -import org.infinispan.context.impl.TxInvocationContext; -import org.infinispan.distribution.DistributionManager; -import org.infinispan.factories.annotations.Inject; -import org.infinispan.interceptors.ClusteringInterceptor; -import org.infinispan.interceptors.locking.ClusteringDependentLogic; -import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter; -import org.infinispan.remoting.responses.ExceptionResponse; -import org.infinispan.remoting.responses.Response; -import org.infinispan.remoting.responses.SuccessfulResponse; -import org.infinispan.remoting.rpc.ResponseFilter; -import org.infinispan.remoting.rpc.ResponseMode; -import org.infinispan.remoting.rpc.RpcOptions; -import org.infinispan.remoting.transport.Address; -import org.infinispan.transaction.xa.GlobalTransaction; -import org.infinispan.util.logging.Log; -import org.infinispan.util.logging.LogFactory; - - -/** - * Base class for distribution of entries across a cluster. - * - * @author Manik Surtani - * @author Mircea.Markus@jboss.com - * @author Pete Muir - * @author Dan Berindei - * @since 4.0 - */ -public abstract class BaseDistributionInterceptor extends ClusteringInterceptor { - - protected DistributionManager dm; - - protected ClusteringDependentLogic cdl; - - private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class); - - @Override - protected Log getLog() { - return log; - } - - @Inject - public void injectDependencies(DistributionManager distributionManager, ClusteringDependentLogic cdl) { - this.dm = distributionManager; - this.cdl = cdl; - } - - @Override - protected final InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationContext ctx, boolean acquireRemoteLock, FlagAffectedCommand command) throws Exception { - GlobalTransaction gtx = acquireRemoteLock ? ((TxInvocationContext)ctx).getGlobalTransaction() : null; - ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx); - - List
targets = new ArrayList
(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key)); - // if any of the recipients has left the cluster since the command was issued, just don't wait for its response - targets.retainAll(rpcManager.getTransport().getMembers()); - ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress()); - RpcOptions options = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false) - .responseFilter(filter).build(); - Map responses = rpcManager.invokeRemotely(targets, get, options); - - if (!responses.isEmpty()) { - for (Response r : responses.values()) { - if (r instanceof SuccessfulResponse) { - InternalCacheValue cacheValue = (InternalCacheValue) ((SuccessfulResponse) r).getResponseValue(); - return cacheValue.toInternalCacheEntry(key); - } - } - } - - // TODO If everyone returned null, and the read CH has changed, retry the remote get. - // Otherwise our get command might be processed by the old owners after they have invalidated their data - // and we'd return a null even though the key exists on - return null; - } - - protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable { - if (ctx.isInTxScope()) { - throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context"); - } - - RecipientGenerator recipientGenerator = new SingleKeyRecipientGenerator(command.getKey()); - - // see if we need to load values from remote sources first - remoteGetBeforeWrite(ctx, command, recipientGenerator); - - // if this is local mode then skip distributing - if (isLocalModeForced(command)) { - return invokeNextInterceptor(ctx, command); - } - - boolean isSync = isSynchronous(command); - if (!ctx.isOriginLocal()) { - Object returnValue = invokeNextInterceptor(ctx, command); - Address primaryOwner = cdl.getPrimaryOwner(command.getKey()); - if (primaryOwner.equals(rpcManager.getAddress())) { - if (command.isConditional() && !command.isSuccessful()) { - log.tracef( - "Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", - command); - return returnValue; - } - rpcManager.invokeRemotely(recipientGenerator.generateRecipients(), command, - rpcManager.getDefaultRpcOptions(isSync)); - } else { - log.tracef("Didn't invoke RPC because primaryOwner (%s) didn't match this node (%s)", primaryOwner, - rpcManager.getAddress()); - log.tracef("Hashcode is (%s) for Key (%s)", - command.getKey().hashCode(), command.getKey()); - } - return returnValue; - } else { - Address primaryOwner = cdl.getPrimaryOwner(command.getKey()); - if (primaryOwner.equals(rpcManager.getAddress())) { - Object result = invokeNextInterceptor(ctx, command); - if (command.isConditional() && !command.isSuccessful()) { - log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command); - return result; - } - List
recipients = recipientGenerator.generateRecipients(); - log.tracef("I'm the primary owner, sending the command to all (%s) the recipients in order to be applied.", recipients); - // check if a single owner has been configured and the target for the key is the local address - boolean isSingleOwnerAndLocal = cacheConfiguration.clustering().hash().numOwners() == 1 - && recipients != null - && recipients.size() == 1 - && recipients.get(0).equals(rpcManager.getTransport().getAddress()); - if (!isSingleOwnerAndLocal) { - rpcManager.invokeRemotely(recipients, command, rpcManager.getDefaultRpcOptions(isSync)); - } - return result; - } else { - log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", primaryOwner); - log.tracef("Hashcode is (%s) for Key (%s)", command.getKey().hashCode(), command.getKey()); - - Object localResult = invokeNextInterceptor(ctx, command); - boolean isSyncForwarding = isSync || isNeedReliableReturnValues(command); - Map addressResponseMap = rpcManager.invokeRemotely(Collections.singletonList(primaryOwner), command, - rpcManager.getDefaultRpcOptions(isSyncForwarding)); - if (!isSyncForwarding) return localResult; - - return getResponseFromPrimaryOwner(primaryOwner, addressResponseMap); - } - } - } - - private Object getResponseFromPrimaryOwner(Address primaryOwner, Map addressResponseMap) { - Response fromPrimaryOwner = addressResponseMap.get(primaryOwner); - if (fromPrimaryOwner == null) { - log.tracef("Primary owner %s returned null", primaryOwner); - return null; - } - if (!fromPrimaryOwner.isSuccessful()) { - Throwable cause = fromPrimaryOwner instanceof ExceptionResponse ? ((ExceptionResponse)fromPrimaryOwner).getException() : null; - throw new CacheException("Got unsuccessful response from primary owner: " + fromPrimaryOwner, cause); - } else { - return ((SuccessfulResponse) fromPrimaryOwner).getResponseValue(); - } - } - - protected abstract void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, RecipientGenerator keygen) throws Throwable; - - interface RecipientGenerator { - - Collection getKeys(); - - List
generateRecipients(); - } - - class SingleKeyRecipientGenerator implements RecipientGenerator { - private final Object key; - private final Set keys; - private List
recipients = null; - - SingleKeyRecipientGenerator(Object key) { - this.key = key; - keys = Collections.singleton(key); - } - - @Override - public List
generateRecipients() { - if (recipients == null) { - recipients = cdl.getOwners(key); - } - return recipients; - } - - @Override - public Collection getKeys() { - return keys; - } - } - - class MultipleKeysRecipientGenerator implements RecipientGenerator { - - private final Collection keys; - private List
recipients = null; - - MultipleKeysRecipientGenerator(Collection keys) { - this.keys = keys; - } - - @Override - public List
generateRecipients() { - if (recipients == null) { - recipients = cdl.getOwners(keys); - } - return recipients; - } - - @Override - public Collection getKeys() { - return keys; - } - } -} diff --git a/opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml b/opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml index 5ec4325c7f..5fb0ddff60 100644 --- a/opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml +++ b/opendaylight/clustering/services_implementation/src/main/resources/config/infinispan-config.xml @@ -1,4 +1,4 @@ - + @@ -11,18 +11,14 @@ jmxDomain="org.infinispan" cacheManagerName="SampleCacheManager"/> - - - - - - + + - - - + + + + + + + + + diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 3690b8542b..e638ba68a7 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -102,6 +102,7 @@ 0.4.2-SNAPSHOT 0.4.2-SNAPSHOT 0.4.2-SNAPSHOT + 0.4.3-SNAPSHOT 1.17 7.0.42 @@ -809,7 +810,7 @@ org.opendaylight.controller clustering.services-implementation - ${controller.version} + ${clustering.services_implementation.version} org.opendaylight.controller