BUG 3066 : Optimistic lock failed, on NetconfStateUpdate
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / RpcManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc;
10
11
12 import akka.actor.ActorRef;
13 import akka.actor.OneForOneStrategy;
14 import akka.actor.Props;
15 import akka.actor.SupervisorStrategy;
16 import akka.japi.Function;
17 import com.google.common.base.Preconditions;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Set;
21 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
22 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
23 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
24 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
25 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
27 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
28 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import scala.concurrent.duration.Duration;
32
33 /**
34  * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
35  *
36  * It also starts the rpc listeners
37  */
38
39 public class RpcManager extends AbstractUntypedActor {
40
41     private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
42
43     private SchemaContext schemaContext;
44     private ActorRef rpcBroker;
45     private ActorRef rpcRegistry;
46     private final RemoteRpcProviderConfig config;
47     private RpcListener rpcListener;
48     private RoutedRpcListener routeChangeListener;
49     private RemoteRpcImplementation rpcImplementation;
50     private final DOMRpcProviderService rpcProvisionRegistry;
51     private final DOMRpcService rpcServices;
52
53     private RpcManager(final SchemaContext schemaContext,
54                        final DOMRpcProviderService rpcProvisionRegistry,
55                        final DOMRpcService rpcSevices) {
56         this.schemaContext = schemaContext;
57         this.rpcProvisionRegistry = rpcProvisionRegistry;
58         rpcServices = rpcSevices;
59         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
60
61         createRpcActors();
62         startListeners();
63     }
64
65
66       public static Props props(final SchemaContext schemaContext,
67               final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices) {
68           Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
69           Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
70           Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
71           return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices);
72       }
73
74     private void createRpcActors() {
75         LOG.debug("Create rpc registry and broker actors");
76
77         rpcRegistry =
78                 getContext().actorOf(Props.create(RpcRegistry.class).
79                     withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
80
81         rpcBroker =
82                 getContext().actorOf(RpcBroker.props(rpcServices, rpcRegistry).
83                     withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
84
85         final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
86         rpcRegistry.tell(localRouter, self());
87     }
88
89     private void startListeners() {
90         LOG.debug("Registers rpc listeners");
91
92         rpcListener = new RpcListener(rpcRegistry);
93         routeChangeListener = new RoutedRpcListener(rpcRegistry);
94         rpcImplementation = new RemoteRpcImplementation(rpcBroker, config);
95
96         rpcServices.registerRpcListener(rpcListener);
97
98 //        rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
99 //        rpcProvisionRegistry.setRoutedRpcDefaultDelegate(rpcImplementation);
100         announceSupportedRpcs();
101     }
102
103     /**
104      * Add all the locally registered RPCs in the clustered routing table
105      */
106     private void announceSupportedRpcs(){
107         LOG.debug("Adding all supported rpcs to routing table");
108         final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
109         final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
110         for (final RpcDefinition rpcDef : currentlySupportedRpc) {
111             rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
112         }
113         rpcListener.onRpcAvailable(rpcs);
114     }
115
116
117     @Override
118     protected void handleReceive(final Object message) throws Exception {
119       if(message instanceof UpdateSchemaContext) {
120         updateSchemaContext((UpdateSchemaContext) message);
121       }
122
123     }
124
125     private void updateSchemaContext(final UpdateSchemaContext message) {
126       schemaContext = message.getSchemaContext();
127       rpcBroker.tell(message, ActorRef.noSender());
128     }
129
130     @Override
131     public SupervisorStrategy supervisorStrategy() {
132       return new OneForOneStrategy(10, Duration.create("1 minute"),
133           new Function<Throwable, SupervisorStrategy.Directive>() {
134             @Override
135             public SupervisorStrategy.Directive apply(final Throwable t) {
136               return SupervisorStrategy.resume();
137             }
138           }
139       );
140     }
141 }