This repository has been archived by the owner on Jan 18, 2022. It is now read-only.
/
WorkerConnector.cs
300 lines (260 loc) · 10.8 KB
/
WorkerConnector.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Improbable.Worker.CInterop;
using Improbable.Worker.CInterop.Alpha;
using Unity.Entities;
using UnityEditor;
using UnityEngine;
namespace Improbable.Gdk.Core
{
/// <summary>
/// Connect workers via Monobehaviours.
/// </summary>
public abstract class WorkerConnector : MonoBehaviour, IDisposable
{
/// <summary>
/// The number of connection attempts before giving up.
/// </summary>
public int MaxConnectionAttempts = 3;
/// <summary>
/// Represents a SpatialOS worker.
/// </summary>
/// <remarks>
/// Only safe to access after the connection has succeeded.
/// </remarks>
public WorkerInWorld Worker;
private List<Action<Worker>> workerConnectedCallbacks = new List<Action<Worker>>();
/// <summary>
/// An event that triggers when the worker has been fully created.
/// </summary>
public event Action<Worker> OnWorkerCreationFinished
{
add
{
workerConnectedCallbacks.Add(value);
if (Worker != null)
{
value.Invoke(Worker);
}
}
remove => workerConnectedCallbacks.Remove(value);
}
private static readonly SemaphoreSlim WorkerConnectionSemaphore = new SemaphoreSlim(1, 1);
// Important run in this step as otherwise it can interfere with the domain unloading logic.
protected void OnApplicationQuit()
{
Dispose();
}
protected void OnDestroy()
{
Dispose();
}
/// <summary>
/// Asynchronously connects a worker to the SpatialOS runtime.
/// </summary>
/// <param name="builder">Describes how to create a <see cref="IConnectionHandler"/> for this worker.</param>
/// <param name="logger">The logger for the worker to use.</param>
/// <returns></returns>
protected async Task Connect(IConnectionHandlerBuilder builder, ILogDispatcher logger)
{
if (builder == null)
{
throw new ArgumentException("Builder cannot be null.", nameof(builder));
}
// Check that other workers have finished trying to connect before this one starts.
// This prevents races on the workers starting and races on when we start ticking systems.
await WorkerConnectionSemaphore.WaitAsync();
try
{
// A check is needed for the case that play mode is exited before the semaphore was released.
if (!Application.isPlaying)
{
return;
}
var origin = transform.position;
Worker = await ConnectWithRetries(builder, MaxConnectionAttempts, logger, builder.WorkerType, origin);
Worker.OnDisconnect += OnDisconnected;
if (!Application.isPlaying)
{
Dispose();
throw new ConnectionFailedException("Editor application stopped",
ConnectionErrorReason.EditorApplicationStopped);
}
HandleWorkerConnectionEstablished();
World.Active = World.Active ?? Worker.World;
// Update PlayerLoop
PlayerLoopUtils.ResolveSystemGroups(Worker.World);
PlayerLoopUtils.AddToPlayerLoop(Worker.World);
}
catch (Exception e)
{
logger.HandleLog(LogType.Error, new LogEvent("Failed to create worker")
.WithException(e)
.WithField("WorkerType", builder.WorkerType)
.WithField("Message", e.Message)
.WithField("Stacktrace", e.StackTrace));
#if UNITY_EDITOR
// Temporary warning to be replaced when we can reliably detect if a local runtime is running, or not.
logger.HandleLog(LogType.Warning,
new LogEvent(
"Is a local runtime running? If not, you can start one from 'SpatialOS -> Local launch' or by pressing Cmd/Ctrl-L")
.WithField("Reason", "A worker running in the Editor failed to connect"));
#endif
// A check is needed for the case that play mode is exited before the connection can complete.
if (Application.isPlaying)
{
HandleWorkerConnectionFailure(e.Message);
Dispose();
}
}
finally
{
WorkerConnectionSemaphore.Release();
}
#if !UNITY_EDITOR && DEVELOPMENT_BUILD && !UNITY_ANDROID && !UNITY_IPHONE
try
{
var port = GetPlayerConnectionPort();
Worker.SendLogMessage(LogLevel.Info, $"Unity PlayerConnection port: {port}.", Worker.WorkerId, null);
}
catch (Exception e)
{
logger.HandleLog(LogType.Exception, new LogEvent("Could not find the Unity PlayerConnection port.").WithException(e));
}
#endif
foreach (var callback in workerConnectedCallbacks)
{
callback(Worker);
}
}
protected virtual void HandleWorkerConnectionEstablished()
{
}
protected virtual void HandleWorkerConnectionFailure(string errorMessage)
{
}
private static async Task<WorkerInWorld> ConnectWithRetries(IConnectionHandlerBuilder connectionHandlerBuilder, int maxAttempts,
ILogDispatcher logger, string workerType, Vector3 origin)
{
var remainingAttempts = maxAttempts;
while (remainingAttempts > 0)
{
if (!Application.isPlaying)
{
throw new ConnectionFailedException("Editor application stopped", ConnectionErrorReason.EditorApplicationStopped);
}
try
{
using (var tokenSource = new CancellationTokenSource())
{
Action cancelTask = delegate { tokenSource.Cancel(); };
Application.quitting += cancelTask;
var workerInWorld = await WorkerInWorld.CreateWorkerInWorldAsync(connectionHandlerBuilder, workerType, logger, origin, tokenSource.Token);
Application.quitting -= cancelTask;
return workerInWorld;
}
}
catch (ConnectionFailedException e)
{
if (e.Reason == ConnectionErrorReason.EditorApplicationStopped)
{
throw;
}
--remainingAttempts;
logger.HandleLog(LogType.Error,
new LogEvent($"Failed attempt {maxAttempts - remainingAttempts} to create worker")
.WithField("WorkerType", workerType)
.WithField("Message", e.Message));
}
}
throw new ConnectionFailedException(
$"Tried to connect {maxAttempts} times - giving up.",
ConnectionErrorReason.ExceededMaximumRetries);
}
protected static string CreateNewWorkerId(string workerType)
{
return $"{workerType}-{Guid.NewGuid().GetHashCode():x}";
}
protected ConnectionParameters CreateConnectionParameters(string workerType, IConnectionParameterInitializer initializer = null)
{
var @params = new ConnectionParameters
{
WorkerType = workerType,
DefaultComponentVtable = new ComponentVtable(),
Network =
{
ConnectionType = NetworkConnectionType.ModularKcp,
ModularKcp =
{
DownstreamCompression = new CompressionParameters(),
UpstreamCompression = new CompressionParameters(),
}
}
};
initializer?.Initialize(@params);
return @params;
}
private void OnDisconnected(string reason)
{
Worker.LogDispatcher.HandleLog(LogType.Log, new LogEvent($"Worker disconnected")
.WithField("WorkerId", Worker.WorkerId)
.WithField("Reason", reason));
StartCoroutine(DeferredDisposeWorker());
}
protected IEnumerator DeferredDisposeWorker()
{
// Remove the world from the loop early, to avoid errors during the delay frame
RemoveFromPlayerLoop();
yield return null;
Dispose();
}
public virtual void Dispose()
{
RemoveFromPlayerLoop();
Worker?.Dispose();
Worker = null;
UnityObjectDestroyer.Destroy(this);
}
private void RemoveFromPlayerLoop()
{
if (Worker?.World != null)
{
// Remove root systems from the disposing world from the PlayerLoop
// This only affects the loop next frame
PlayerLoopUtils.RemoveFromPlayerLoop(Worker.World);
}
}
private static ushort GetPlayerConnectionPort()
{
// We need to open the File as ReadWrite since this process _already_ has it open as ReadWrite.
// Attempting to open it as Read only results in IO exceptions due to permissions. Go figure.
using (var stream = new FileStream(Application.consoleLogPath, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite))
using (var readStream = new StreamReader(stream))
{
var logContents = readStream.ReadToEnd();
return ExtractPlayerConnectionPort(logContents);
}
}
internal static ushort ExtractPlayerConnectionPort(string fileContents)
{
const string portRegex =
"PlayerConnection initialized network socket : 0\\.0\\.0\\.0 ([0-9]+)";
var regex = new Regex(portRegex, RegexOptions.Compiled);
if (!regex.IsMatch(fileContents))
{
throw new Exception("Could not find PlayerConnection port in logfile");
}
var port = ushort.Parse(regex.Match(fileContents).Groups[1].Value);
if (port == 0)
{
throw new Exception("PlayerConnection port cannot be 0.");
}
return port;
}
}
}