X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FOpsRegistrar.java;fp=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2FOpsRegistrar.java;h=fdda197c3bf02f6dedf64859cf042438d483bf41;hp=0000000000000000000000000000000000000000;hb=927bce5688e4b9d33d3e5e9b769d8a0dba5ccdd4;hpb=a2b838f96589b502578fa4e15cef2769f886a378 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java new file mode 100644 index 0000000000..fdda197c3b --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/OpsRegistrar.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc; + +import static java.util.Objects.requireNonNull; + +import akka.actor.Address; +import akka.actor.Props; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.Messages.UpdateRemoteActionEndpoints; +import org.opendaylight.controller.remote.rpc.registry.ActionRegistry.RemoteActionEndpoint; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints; +import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint; +import org.opendaylight.mdsal.dom.api.DOMActionImplementation; +import org.opendaylight.mdsal.dom.api.DOMActionProviderService; +import org.opendaylight.mdsal.dom.api.DOMRpcImplementation; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; +import org.opendaylight.yangtools.concepts.ObjectRegistration; + +/** + * Actor handling registration of RPCs and Actions available on remote nodes with the local + * {@link DOMRpcProviderService} and {@link DOMActionProviderService}. + */ +final class OpsRegistrar extends AbstractUntypedActor { + private final Map> rpcRegs = new HashMap<>(); + private final Map> actionRegs = new HashMap<>(); + private final DOMRpcProviderService rpcProviderService; + private final RemoteOpsProviderConfig config; + private final DOMActionProviderService actionProviderService; + + OpsRegistrar(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService, + final DOMActionProviderService actionProviderService) { + this.config = requireNonNull(config); + this.rpcProviderService = requireNonNull(rpcProviderService); + this.actionProviderService = requireNonNull(actionProviderService); + } + + public static Props props(final RemoteOpsProviderConfig config, final DOMRpcProviderService rpcProviderService, + final DOMActionProviderService actionProviderService) { + return Props.create(OpsRegistrar.class, requireNonNull(config), + requireNonNull(rpcProviderService, "DOMRpcProviderService cannot be null"), + requireNonNull(actionProviderService, "DOMActionProviderService cannot be null")); + } + + @Override + public void postStop() throws Exception { + rpcRegs.clear(); + actionRegs.clear(); + + super.postStop(); + } + + @Override + protected void handleReceive(final Object message) { + if (message instanceof UpdateRemoteEndpoints) { + LOG.debug("Handling updateRemoteEndpoints message"); + updateRemoteRpcEndpoints(((UpdateRemoteEndpoints) message).getRpcEndpoints()); + } else if (message instanceof UpdateRemoteActionEndpoints) { + LOG.debug("Handling updateRemoteActionEndpoints message"); + updateRemoteActionEndpoints(((UpdateRemoteActionEndpoints) message).getActionEndpoints()); + } else { + unknownMessage(message); + } + } + + private void updateRemoteRpcEndpoints(final Map> rpcEndpoints) { + /* + * Updating RPC providers is a two-step process. We first add the newly-discovered RPCs and then close + * the old registration. This minimizes churn observed by listeners, as they will not observe RPC + * unavailability which would occur if we were to do it the other way around. + * + * Note that when an RPC moves from one remote node to another, we also do not want to expose the gap, + * hence we register all new implementations before closing all registrations. + */ + for (Entry> e : rpcEndpoints.entrySet()) { + LOG.debug("Updating RPC registrations for {}", e.getKey()); + + final Optional maybeEndpoint = e.getValue(); + if (maybeEndpoint.isPresent()) { + final RemoteRpcEndpoint endpoint = maybeEndpoint.get(); + final RemoteRpcImplementation impl = new RemoteRpcImplementation(endpoint.getRouter(), config); + rpcRegs.put(e.getKey(), rpcProviderService.registerRpcImplementation(impl, + endpoint.getRpcs())); + } else { + rpcRegs.remove(e.getKey()); + } + } + } + + /** + * Updates the action endpoints, Adding new registrations first before removing previous registrations. + */ + private void updateRemoteActionEndpoints(final Map> actionEndpoints) { + /* + * Updating Action providers is a two-step process. We first add the newly-discovered RPCs and then close + * the old registration. This minimizes churn observed by listeners, as they will not observe RPC + * unavailability which would occur if we were to do it the other way around. + * + * Note that when an Action moves from one remote node to another, we also do not want to expose the gap, + * hence we register all new implementations before closing all registrations. + */ + for (Entry> e : actionEndpoints.entrySet()) { + LOG.debug("Updating Action registrations for {}", e.getKey()); + + final Optional maybeEndpoint = e.getValue(); + if (maybeEndpoint.isPresent()) { + final RemoteActionEndpoint endpoint = maybeEndpoint.get(); + final RemoteActionImplementation impl = new RemoteActionImplementation(endpoint.getRouter(), config); + actionRegs.put(e.getKey(), + actionProviderService.registerActionImplementation(impl, endpoint.getActions())); + } else { + actionRegs.remove(e.getKey()); + } + } + } +}