This repository has been archived by the owner on Sep 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 250
/
manager.ts
304 lines (267 loc) · 11.3 KB
/
manager.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
/*
* Copyright 2014 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
/// <reference path="../../../../../../externs/ts/node/socket.io.d.ts" />
/// <reference path="../../../../../../externs/ts/node/node-uuid.d.ts" />
import conn = require('./connection');
import messages = require('../shared/messages');
import sessions = require('./session');
import socketio = require('socket.io');
import uuid = require('node-uuid');
import util = require('../common/util');
/**
* Manages the lifecycles of a set of sessions between users and kernels.
*
* When a user connects, the session manager specifies which session the user should
* join, which may involve creating a new session or joining an existing one.
*
* In short, the session manager contains all of the business logic for how to bind together
* users and kernels into session objects.
*/
export class SessionManager implements app.ISessionManager {
_connectionIdToConnection: app.Map<app.IClientConnection>;
_connectionIdToSession: app.Map<app.ISession>;
_sessionPathToSession: app.Map<app.ISession>;
_kernelManager: app.IKernelManager;
_messageProcessors: app.MessageProcessor[];
_notebookStorage: app.INotebookStorage;
_socketioManager: socketio.SocketManager;
constructor (
kernelManager: app.IKernelManager,
messageProcessors: app.MessageProcessor[],
notebookStorage: app.INotebookStorage,
socketioManager: socketio.SocketManager) {
this._kernelManager = kernelManager;
this._messageProcessors = messageProcessors;
this._notebookStorage = notebookStorage;
this._socketioManager = socketioManager;
this._connectionIdToSession = {};
this._connectionIdToConnection = {};
this._sessionPathToSession = {};
this._registerHandlers();
}
/**
* Asynchronously creates a session for the given resource path.
*
* Idempotent. Subsequent calls to create for a pre-existing session have no effect.
*
* @param path The resource (e.g., notebook) path for which to create a session.
* @param callback Completion callback for handling the outcome of the session creation flow.
*/
create(sessionPath: string, callback: app.Callback<app.ISession>) {
// Retrieve an existing session for the specified session path if it exists.
var session = this._sessionPathToSession[sessionPath];
if (session) {
// Session already exists, so just signal completion.
process.nextTick(callback.bind(null, null, session));
return;
}
// Create a new session since one did not already exist.
session = new sessions.Session(
sessionPath,
this._kernelManager,
this._handleMessage.bind(this),
this._notebookStorage);
// Track the session by path
this._sessionPathToSession[sessionPath] = session;
// Start the session and provide the completion callback to be invoked when session is fully
// initialized.
session.start((error) => {
if (error) {
// If an error occurred when starting the session, immediately shutdown the session
// to clean up any resources.
this.shutdown(session.path, (shutdownError) => {
// If shutdown failed, log the error.
console.log('Failed to shutdown session:', shutdownError);
// Pass the original session startup error back to the caller.
callback(error);
});
return;
}
// Pass the newly created session and any errors that may have occurred back to the caller.
callback(null, session);
});
}
/**
* Gets a session by its path if it exists.
*
* @param sessionPath The session path to get.
* @return A session or null if the path was not found.
*/
get(sessionPath: string): app.ISession {
return this._sessionPathToSession[sessionPath] || null;
}
/**
* Gets the list of sessions currently managed by this instance.
*
* @return The set of active sessions.
*/
list(): app.ISession[] {
return Object.keys(this._sessionPathToSession).map((sessionPath) => {
return this._sessionPathToSession[sessionPath];
});
}
shutdown(sessionPath: string, callback: app.Callback<void>) {
// Retrieve the session that should be shut down
var session = this._sessionPathToSession[sessionPath];
if (!session) {
process.nextTick(callback.bind(null,
util.createError('Session path "%s" does not exist', sessionPath)));
}
// Ask the session to shutdown asynchronously.
session.shutdown((error: Error) => {
// Verify that the shutdown was successful.
if (error) {
// Shutdown failed, so pass error to caller.
callback(error);
return;
}
// Now that the session has been shutdown, untrack it.
delete this._sessionPathToSession[sessionPath];
// Done with shutdown, so invoke the completion callback.
callback(null);
});
}
/**
* Gets the metadata provided during the connection establishment.
*
* Note: a notebook rename causes the notebook path to be changed (at the session level)
* but that change is not reflected in the return value of this method. That is because
* this method always returns the value of the notebook path at the time of the connection
* establishment; i.e., whatever notebook path was part of the original handshake data.
*
* So, only assume the notebook path returned here to match the session notebook path at the
* time of connection establishment.
*/
_getConnectionData(socket: socketio.Socket): app.ClientConnectionData {
return {
notebookPath: socket.handshake.query.notebookPath
}
}
/**
* Receives and processes all messages flowing through all sessions owned by this instance.
*
* Session objects that pass control to this method also supply a "next action" callback for
* returning control to the session after the middleware stack has had an opportunity
* to manipulate a given message.
*/
_handleMessage(message: any, session: app.ISession, callback: app.EventHandler<any>) {
// Invoke each handler in the chain in order.
//
// If a handler returns null, the the message is considered "filtered" and processing
// of the message stops.
var processedMessage = message;
for (var i = 0; i < this._messageProcessors.length; ++i) {
processedMessage = this._messageProcessors[i](processedMessage, session, this);
if (processedMessage === null) {
// Then this message has been filtered, no further processing.
console.log('Filtered: ', JSON.stringify(message));
break;
}
}
// Return control to the messaging stack via Session object that corresponds to this message
// if the message was not filtered by one of the message handlers.
if (processedMessage !== null) {
callback(processedMessage);
}
}
_getOrCreateSession(sessionPath: string, callback: app.Callback<app.ISession>) {
// Lookup the session path in the set of active sessions to see if it already exists.
var session = this._sessionPathToSession[sessionPath];
if (session) {
// Session already exists, so just return it.
process.nextTick(callback.bind(null, null, session));
}
// No existing session for given session path, so create one.
this.create(sessionPath, callback);
}
/**
* Binds the new client connection to a session and configures session event handling.
*
* If the session for the given connection already exists, the new connection (re)connects to the
* existing session.
*
* Two clients that specify the same session path will join the same session.
*
* A single client can also specify a previous session path to re-join a previous session,
* assuming that session is still alive.
*
* The current session model assumes that session creation should always be completed before
* connections are permitted. So, if the session does not exist, the connection is closed
* immediately.
*
* @param socket A socket.io connection.
*/
_handleClientConnect(socket: socketio.Socket) {
// Get the existing session for the session path specified in the socket connection handshake.
var sessionPath = this._getConnectionData(socket).notebookPath;
this._getOrCreateSession(sessionPath, (error, session) => {
if (error) {
// Close the socket connection immediately if the session could not be created.
//
// Terminating the connection from the server side is insufficient, because the client will
// attempt to reconnect. So, send a message on the established connection informing the
// client that it should close the connection from the client side.
socket.emit(messages.terminateConnection);
return;
}
var connectionId = socket.id;
// Avoid duplicate socket.io connect events that can occur for a single (active) connection.
if (this._connectionIdToSession[connectionId]) {
// Duplicate connect event for existing connection. Nothing to do.
return;
}
// Delegate all socket.io Action messages to the session.
var connection = new conn.ClientConnection(
connectionId,
socket,
session.processAction.bind(session),
this._handleClientDisconnect.bind(this));
// Update existing session object with new user connection.
session.addClientConnection(connection);
console.log('User has connected: ' + connection.id);
// Store a mapping from connection to the associated session so that the session can be
// retrieved on disconnect.
this._connectionIdToSession[connection.id] = session;
// Store the mapping of connection id => connection to track the set of connected clients
// across all sessions.
this._connectionIdToConnection[connection.id] = connection;
});
}
/**
* Removes the connection upon client disconnect.
*
* @param connection A client connection.
*/
_handleClientDisconnect(connection: app.IClientConnection) {
// Find the session associated with this connection.
var session = this._connectionIdToSession[connection.id];
if (!session) {
// Could have received a duplicate disconnect event if the session is already untracked.
//
// Nothing to do if there is no session mapped to the connection.
return;
}
// Remove the connection from the session.
session.removeClientConnection(connection);
console.log('User has disconnected: ' + connection.id);
// Remove the connection from the index
delete this._connectionIdToConnection[connection.id];
// Remove the connection => session mapping
delete this._connectionIdToSession[connection.id];
}
_registerHandlers() {
this._socketioManager.on('connection', this._handleClientConnect.bind(this));
// Note: disconnect handlers are at the socket/connection level
}
}