2 * Copyright (C) 2014 EBay Software Foundation
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Ashwin Raveendran
10 package org.opendaylight.ovsdb.lib;
12 import java.util.HashMap;
13 import java.util.Iterator;
14 import java.util.List;
16 import java.util.Queue;
17 import java.util.UUID;
18 import java.util.concurrent.ExecutorService;
20 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
21 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
22 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
23 import org.opendaylight.ovsdb.lib.message.TableUpdate;
24 import org.opendaylight.ovsdb.lib.message.TableUpdates;
25 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
26 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
27 import org.opendaylight.ovsdb.lib.operations.Operation;
28 import org.opendaylight.ovsdb.lib.operations.OperationResult;
29 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
30 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
31 import org.opendaylight.ovsdb.lib.schema.TableSchema;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import com.fasterxml.jackson.databind.JsonNode;
36 import com.fasterxml.jackson.databind.node.ObjectNode;
37 import com.google.common.base.Function;
38 import com.google.common.collect.ImmutableMap;
39 import com.google.common.collect.Lists;
40 import com.google.common.collect.Maps;
41 import com.google.common.util.concurrent.FutureCallback;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.ListenableFuture;
44 import com.google.common.util.concurrent.SettableFuture;
47 public class OvsDBClientImpl implements OvsDBClient {
49 protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
50 private ExecutorService executorService;
52 private Map<String, DatabaseSchema> schema = Maps.newHashMap();
53 private HashMap<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
54 private Queue<Throwable> exceptions;
55 private OvsdbRPC.Callback rpcCallback;
57 public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
59 this.executorService = executorService;
65 void setupUpdateListener() {
66 if (rpcCallback == null) {
67 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
69 public void update(Object node, UpdateNotification upadateNotification) {
70 Object key = upadateNotification.getContext();
71 CallbackContext callbackContext = monitorCallbacks.get(key);
72 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
73 if (monitorCallBack == null) {
75 logger.info("callback received with context {}, but no known handler. Ignoring!", key);
78 _transformingCallback(upadateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
82 public void locked(Object node, List<String> ids) {
87 public void stolen(Object node, List<String> ids) {
91 this.rpcCallback = temp;
92 rpc.registerCallback(temp);
97 protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
98 //todo(ashwin): we should move all the JSON parsing logic to a utility class
99 if (tableUpdatesJson instanceof ObjectNode) {
100 Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
101 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
102 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();){
103 Map.Entry<String, JsonNode> entry = itr.next();
105 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
106 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
107 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
110 TableUpdates updates = new TableUpdates(tableUpdateMap);
111 monitorCallBack.update(updates);
116 public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
118 //todo, we may not need transactionbuilder if we can have JSON objects
119 TransactBuilder builder = new TransactBuilder();
120 for (Operation o : operations) {
121 builder.addOperation(o);
124 return rpc.transact(builder);
128 public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
129 List<MonitorRequest<E>> monitorRequest,
130 final MonitorCallBack callback) {
132 final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
133 new Function<MonitorRequest<E>, String>() {
135 public String apply(MonitorRequest<E> input) {
136 return input.getTableName();
140 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
141 registerCallback(monitorHandle, callback, dbSchema);
143 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
145 public List<Object> params() {
146 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
149 Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
151 public void onSuccess(JsonNode result) {
152 _transformingCallback(result, callback, dbSchema);
156 public void onFailure(Throwable t) {
157 callback.exception(t);
161 return monitorHandle;
164 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
165 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
166 setupUpdateListener();
170 public void cancelMonitor(MonitorHandle handler) {
171 throw new UnsupportedOperationException("not yet implemented");
175 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
176 throw new UnsupportedOperationException("not yet implemented");
180 public ListenableFuture<Boolean> steal(String lockId) {
181 throw new UnsupportedOperationException("not yet implemented");
185 public ListenableFuture<Boolean> unLock(String lockId) {
186 throw new UnsupportedOperationException("not yet implemented");
190 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
191 throw new UnsupportedOperationException("not yet implemented");
195 public void stopEchoService() {
196 throw new UnsupportedOperationException("not yet implemented");
200 public TransactionBuilder transactBuilder() {
201 return new TransactionBuilder(this);
205 public boolean isReady(int timeout) throws InterruptedException {
206 while (timeout > 0) {
207 if (!schema.isEmpty()) {
217 public ListenableFuture<List<String>> getDatabases() {
218 return rpc.list_dbs();
222 public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
224 DatabaseSchema databaseSchema = schema.get(database);
226 if (databaseSchema == null || cacheResult) {
227 return Futures.transform(
228 getSchemaFromDevice(Lists.newArrayList(database)),
229 new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
231 public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
232 if (result.containsKey(database)) {
233 DatabaseSchema s = result.get(database);
234 s.populateInternallyGeneratedColumns();
236 OvsDBClientImpl.this.schema.put(database, s);
247 return Futures.immediateFuture(databaseSchema);
251 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
252 Map<String, DatabaseSchema> schema = Maps.newHashMap();
253 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
254 _populateSchema(dbNames, schema, future);
258 private void _populateSchema(final List<String> dbNames,
259 final Map<String, DatabaseSchema> schema,
260 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
262 if (dbNames == null || dbNames.isEmpty()) {
266 Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
267 new com.google.common.base.Function<JsonNode, Void>() {
269 public Void apply(JsonNode jsonNode) {
271 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
272 if (schema.size() > 1 && !sfuture.isCancelled()) {
273 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
274 } else if (schema.size() == 1) {
277 } catch (Throwable e) {
278 sfuture.setException(e);
285 public void setRpc(OvsdbRPC rpc) {
289 public Queue<Throwable> getExceptions() {
293 static class CallbackContext {
294 MonitorCallBack monitorCallBack;
295 DatabaseSchema schema;
297 CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
298 this.monitorCallBack = monitorCallBack;
299 this.schema = schema;