/
SpannerPool.java
490 lines (453 loc) · 18.7 KB
/
SpannerPool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.connection;
import com.google.api.core.ApiFunction;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Ticker;
import com.google.common.collect.Iterables;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
/**
* Pool for keeping track of {@link Spanner} instances needed for connections.
*
* <p>When a connection is opened for a Google Cloud Spanner database, a {@link Spanner} object can
* be opened in the background. The {@link SpannerPool} keeps track of which {@link Spanner} objects
* have been opened by connections during the lifetime of the JVM, which connections are still
* opened and closed, and which {@link Spanner} objects could be closed.
*
* <p>Call the method {@link SpannerPool#closeSpannerPool()} at the end of your application to
* gracefully shutdown all instances in the pool.
*/
public class SpannerPool {
// TODO: create separate Client Lib Token for the Connection API.
private static final String CONNECTION_API_CLIENT_LIB_TOKEN = "sp-jdbc";
private static final Logger logger = Logger.getLogger(SpannerPool.class.getName());
/**
* Closes the default {@link SpannerPool} and all {@link Spanner} instances that have been opened
* by connections and that are still open. Call this method at the end of your application to
* gracefully close all {@link Spanner} instances in the pool. Failing to call this method will
* keep your application running for 60 seconds after you close the last {@link
* java.sql.Connection} to Cloud Spanner, as this is the default timeout before the {@link
* SpannerPool} closes the unused {@link Spanner} instances.
*/
public static void closeSpannerPool() {
INSTANCE.checkAndCloseSpanners();
}
/**
* The minimum number of milliseconds a {@link Spanner} should not have been used for a connection
* before it is closed.
*/
private static final long DEFAULT_CLOSE_SPANNER_AFTER_MILLISECONDS_UNUSED = 60000L;
static final SpannerPool INSTANCE =
new SpannerPool(DEFAULT_CLOSE_SPANNER_AFTER_MILLISECONDS_UNUSED, Ticker.systemTicker());
@VisibleForTesting
enum CheckAndCloseSpannersMode {
WARN,
ERROR;
}
private final class CloseSpannerRunnable implements Runnable {
@Override
public void run() {
try {
checkAndCloseSpanners(CheckAndCloseSpannersMode.WARN);
} catch (Exception e) {
// ignore
}
}
}
private final class CloseUnusedSpannersRunnable implements Runnable {
@Override
public void run() {
try {
closeUnusedSpanners(SpannerPool.this.closeSpannerAfterMillisecondsUnused);
} catch (Throwable e) {
logger.log(Level.FINE, "Scheduled call to closeUnusedSpanners failed", e);
}
}
}
static class CredentialsKey {
static final Object DEFAULT_CREDENTIALS_KEY = new Object();
final Object key;
static CredentialsKey create(ConnectionOptions options) {
return new CredentialsKey(
Iterables.find(
Arrays.asList(
options.getOAuthToken(),
options.getFixedCredentials(),
options.getCredentialsUrl(),
DEFAULT_CREDENTIALS_KEY),
Predicates.notNull()));
}
private CredentialsKey(Object key) {
this.key = Preconditions.checkNotNull(key);
}
public int hashCode() {
return key.hashCode();
}
public boolean equals(Object o) {
return (o instanceof CredentialsKey && Objects.equals(((CredentialsKey) o).key, this.key));
}
}
static class SpannerPoolKey {
private final String host;
private final String projectId;
private final CredentialsKey credentialsKey;
private final SessionPoolOptions sessionPoolOptions;
private final Integer numChannels;
private final boolean usePlainText;
private final String userAgent;
private static SpannerPoolKey of(ConnectionOptions options) {
return new SpannerPoolKey(options);
}
private SpannerPoolKey(ConnectionOptions options) {
this.host = options.getHost();
this.projectId = options.getProjectId();
this.credentialsKey = CredentialsKey.create(options);
this.sessionPoolOptions = options.getSessionPoolOptions();
this.numChannels = options.getNumChannels();
this.usePlainText = options.isUsePlainText();
this.userAgent = options.getUserAgent();
}
@Override
public boolean equals(Object o) {
if (!(o instanceof SpannerPoolKey)) {
return false;
}
SpannerPoolKey other = (SpannerPoolKey) o;
return Objects.equals(this.host, other.host)
&& Objects.equals(this.projectId, other.projectId)
&& Objects.equals(this.credentialsKey, other.credentialsKey)
&& Objects.equals(this.sessionPoolOptions, other.sessionPoolOptions)
&& Objects.equals(this.numChannels, other.numChannels)
&& Objects.equals(this.usePlainText, other.usePlainText)
&& Objects.equals(this.userAgent, other.userAgent);
}
@Override
public int hashCode() {
return Objects.hash(
this.host,
this.projectId,
this.credentialsKey,
this.sessionPoolOptions,
this.numChannels,
this.usePlainText,
this.userAgent);
}
}
/**
* The management threads of a {@link SpannerPool} are lazily initialized to prevent unnecessary
* threads to be created when the connection API is not used.
*/
private boolean initialized = false;
/**
* Thread that will be run as a shutdown hook on closing the application. This thread will close
* any Spanner instances opened by the Connection API that are still open.
*/
private Thread shutdownThread = null;
/**
* Keep unused {@link Spanner} instances open and in the pool for this duration after all its
* {@link Connection}s have been closed. This prevents unnecessary opening and closing of {@link
* Spanner} instances.
*/
private final long closeSpannerAfterMillisecondsUnused;
/**
* This scheduled task will close all {@link Spanner} objects that have not been used for an open
* connection for at least {@link SpannerPool#DEFAULT_CLOSE_SPANNER_AFTER_MILLISECONDS_UNUSED}
* milliseconds.
*/
private ScheduledExecutorService closerService;
@GuardedBy("this")
private final Map<SpannerPoolKey, Spanner> spanners = new HashMap<>();
@GuardedBy("this")
private final Map<SpannerPoolKey, List<ConnectionImpl>> connections = new HashMap<>();
/**
* Keep track of the moment that the last connection for a specific {@link SpannerPoolKey} was
* closed, so that we can use this to determine whether a {@link Spanner} instance should be
* closed and removed from the pool. As {@link Spanner} instances are expensive to create and
* close, we do not want to do that unnecessarily. By adding a delay between the moment the last
* {@link Connection} for a {@link Spanner} was closed and the moment we close the {@link Spanner}
* instance, we prevent applications that open one or more connections for a process and close all
* these connections at the end of the process from getting a severe performance penalty from
* opening and closing {@link Spanner} instances all the time.
*
* <p>{@link Spanner} instances are closed and removed from the pool when the last connection was
* closed more than {@link #closeSpannerAfterMillisecondsUnused} milliseconds ago.
*/
@GuardedBy("this")
private final Map<SpannerPoolKey, Long> lastConnectionClosedAt = new HashMap<>();
private final Ticker ticker;
@VisibleForTesting
SpannerPool(Ticker ticker) {
this(0L, ticker);
}
@VisibleForTesting
SpannerPool(long closeSpannerAfterMillisecondsUnused, Ticker ticker) {
this.closeSpannerAfterMillisecondsUnused = closeSpannerAfterMillisecondsUnused;
this.ticker = ticker;
}
/**
* Gets a Spanner object for a connection with the properties specified in the {@link
* ConnectionOptions} object. The {@link SpannerPool} will manage a pool of opened Spanner objects
* for the different connections, and reuse Spanner objects whenever possible. Spanner objects
* will also be closed down when the application is closing.
*
* @param options The specification of the Spanner database to connect to.
* @param connection The {@link ConnectionImpl} that will be created. This {@link ConnectionImpl}
* will be tracked by the pool to know when a {@link Spanner} object can be closed.
* @return an opened {@link Spanner} object that can be used by a connection to communicate with
* the Spanner database.
*/
Spanner getSpanner(ConnectionOptions options, ConnectionImpl connection) {
Preconditions.checkNotNull(options);
Preconditions.checkNotNull(connection);
SpannerPoolKey key = SpannerPoolKey.of(options);
Spanner spanner;
synchronized (this) {
if (!initialized) {
initialize();
}
if (spanners.get(key) != null) {
spanner = spanners.get(key);
} else {
spanner = createSpanner(key, options);
spanners.put(key, spanner);
}
List<ConnectionImpl> registeredConnectionsForSpanner = connections.get(key);
if (registeredConnectionsForSpanner == null) {
registeredConnectionsForSpanner = new ArrayList<>();
connections.put(key, registeredConnectionsForSpanner);
}
registeredConnectionsForSpanner.add(connection);
lastConnectionClosedAt.remove(key);
return spanner;
}
}
private void initialize() {
shutdownThread = new Thread(new CloseSpannerRunnable(), "SpannerPool shutdown hook");
Runtime.getRuntime().addShutdownHook(shutdownThread);
if (this.closeSpannerAfterMillisecondsUnused > 0) {
this.closerService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "close-unused-spanners-worker");
thread.setDaemon(true);
return thread;
}
});
this.closerService.scheduleAtFixedRate(
new CloseUnusedSpannersRunnable(),
this.closeSpannerAfterMillisecondsUnused,
this.closeSpannerAfterMillisecondsUnused,
TimeUnit.MILLISECONDS);
}
initialized = true;
}
@SuppressWarnings("rawtypes")
@VisibleForTesting
Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) {
SpannerOptions.Builder builder = SpannerOptions.newBuilder();
builder
.setClientLibToken(MoreObjects.firstNonNull(key.userAgent, CONNECTION_API_CLIENT_LIB_TOKEN))
.setHost(key.host)
.setProjectId(key.projectId)
.setCredentials(options.getCredentials());
builder.setSessionPoolOption(key.sessionPoolOptions);
if (key.numChannels != null) {
builder.setNumChannels(key.numChannels);
}
if (key.usePlainText) {
// Credentials may not be sent over a plain text channel.
builder.setCredentials(NoCredentials.getInstance());
// Set a custom channel configurator to allow http instead of https.
builder.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
input.usePlaintext();
return input;
}
});
}
if (options.getConfigurator() != null) {
options.getConfigurator().configure(builder);
}
return builder.build().getService();
}
/**
* Remove the given {@link ConnectionImpl} from the list of connections that should be monitored
* by this pool.
*
* @param options The {@link ConnectionOptions} that were used to create the connection.
* @param connection The {@link ConnectionImpl} to remove from this pool..
*/
void removeConnection(ConnectionOptions options, ConnectionImpl connection) {
Preconditions.checkNotNull(options);
Preconditions.checkNotNull(connection);
SpannerPoolKey key = SpannerPoolKey.of(options);
synchronized (this) {
if (spanners.containsKey(key) && connections.containsKey(key)) {
List<ConnectionImpl> registeredConnections = connections.get(key);
// Remove the connection from the pool.
if (registeredConnections == null || !registeredConnections.remove(connection)) {
logger.log(
Level.WARNING,
"There are no connections registered for ConnectionOptions " + options.toString());
} else {
// Check if this was the last connection for this spanner key.
if (registeredConnections.isEmpty()) {
// Register the moment the last connection for this Spanner key was removed, so we know
// which Spanner objects we could close.
lastConnectionClosedAt.put(
key, TimeUnit.MILLISECONDS.convert(ticker.read(), TimeUnit.NANOSECONDS));
}
}
} else {
logger.log(
Level.WARNING,
"There is no Spanner registered for ConnectionOptions " + options.toString());
}
}
}
/**
* Checks that there are no {@link Connection}s that have been created by this {@link SpannerPool}
* that are still open, and then closes all {@link Spanner} instances in the pool. If there is at
* least one unclosed {@link Connection} left in the pool, the method will throw a {@link
* SpannerException} and no {@link Spanner} instances will be closed.
*/
void checkAndCloseSpanners() {
checkAndCloseSpanners(CheckAndCloseSpannersMode.ERROR);
}
@VisibleForTesting
void checkAndCloseSpanners(CheckAndCloseSpannersMode mode) {
List<SpannerPoolKey> keysStillInUse = new ArrayList<>();
synchronized (this) {
for (Entry<SpannerPoolKey, Spanner> entry : spanners.entrySet()) {
if (!lastConnectionClosedAt.containsKey(entry.getKey())) {
keysStillInUse.add(entry.getKey());
}
}
try {
if (keysStillInUse.isEmpty() || mode == CheckAndCloseSpannersMode.WARN) {
if (!keysStillInUse.isEmpty()) {
logLeakedConnections(keysStillInUse);
logger.log(
Level.WARNING,
"There is/are "
+ keysStillInUse.size()
+ " connection(s) still open."
+ " Close all connections before stopping the application");
}
// Force close all Spanner instances by passing in a value that will always be less than
// the
// difference between the current time and the close time of a connection.
closeUnusedSpanners(Long.MIN_VALUE);
} else {
logLeakedConnections(keysStillInUse);
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"There is/are "
+ keysStillInUse.size()
+ " connection(s) still open. Close all connections before calling closeSpanner()");
}
} finally {
if (closerService != null) {
closerService.shutdown();
}
initialized = false;
}
}
}
private void logLeakedConnections(List<SpannerPoolKey> keysStillInUse) {
synchronized (this) {
for (SpannerPoolKey key : keysStillInUse) {
for (ConnectionImpl con : connections.get(key)) {
if (!con.isClosed() && con.getLeakedException() != null) {
logger.log(Level.WARNING, "Leaked connection", con.getLeakedException());
}
}
}
}
}
/**
* Closes Spanner objects that are no longer in use by connections, and where the last connection
* that used it was closed more than <code>closeSpannerAfterMillisecondsUnused</code> seconds ago.
* The delay ensures that Spanner objects are not closed unless there's a good reason for it.
*
* @param closeSpannerAfterMillisecondsUnused The number of milliseconds a {@link Spanner} object
* should not have been used for a {@link Connection} before it is closed by this method.
*/
@VisibleForTesting
void closeUnusedSpanners(long closeSpannerAfterMillisecondsUnused) {
List<SpannerPoolKey> keysToBeRemoved = new ArrayList<>();
synchronized (this) {
for (Entry<SpannerPoolKey, Long> entry : lastConnectionClosedAt.entrySet()) {
Long closedAt = entry.getValue();
// Check whether the last connection was closed more than
// closeSpannerAfterMillisecondsUnused milliseconds ago.
if (closedAt != null
&& ((TimeUnit.MILLISECONDS.convert(ticker.read(), TimeUnit.NANOSECONDS)
- closedAt.longValue()))
> closeSpannerAfterMillisecondsUnused) {
Spanner spanner = spanners.get(entry.getKey());
if (spanner != null) {
try {
spanner.close();
} finally {
// Even if the close operation failed, we should remove the spanner object as it is no
// longer valid.
spanners.remove(entry.getKey());
keysToBeRemoved.add(entry.getKey());
}
}
}
}
for (SpannerPoolKey key : keysToBeRemoved) {
lastConnectionClosedAt.remove(key);
}
}
}
@VisibleForTesting
int getCurrentSpannerCount() {
synchronized (this) {
return spanners.size();
}
}
}