-
Notifications
You must be signed in to change notification settings - Fork 90
/
JsonStreamReader.cs
223 lines (190 loc) · 7.93 KB
/
JsonStreamReader.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
namespace Microsoft.Psi.Data.Json
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Threading;
using Microsoft.Psi;
using Newtonsoft.Json.Linq;
/// <summary>
/// Represents a stream reader for JSON data stores.
/// </summary>
public class JsonStreamReader : IStreamReader, IDisposable
{
private readonly Dictionary<int, Action<JToken, Envelope>> outputs = new Dictionary<int, Action<JToken, Envelope>>();
private readonly string extension;
/// <summary>
/// Initializes a new instance of the <see cref="JsonStreamReader"/> class.
/// </summary>
/// <param name="name">The name of the application that generated the persisted files, or the root name of the files.</param>
/// <param name="path">The directory in which the main persisted file resides or will reside, or null to create a volatile data store.</param>
/// <param name="extension">The extension for the underlying file.</param>
public JsonStreamReader(string name, string path, string extension)
: this(extension)
{
this.Reader = new JsonStoreReader(name, path, extension);
}
/// <summary>
/// Initializes a new instance of the <see cref="JsonStreamReader"/> class.
/// </summary>
/// <param name="name">The name of the application that generated the persisted files, or the root name of the files.</param>
/// <param name="path">The directory in which the main persisted file resides or will reside, or null to create a volatile data store.</param>
public JsonStreamReader(string name, string path)
: this(name, path, JsonStoreBase.DefaultExtension)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="JsonStreamReader"/> class.
/// </summary>
/// <param name="that">Existing <see cref="JsonStreamReader"/> used to initialize new instance.</param>
public JsonStreamReader(JsonStreamReader that)
: this(that.Name, that.Path, that.extension)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="JsonStreamReader"/> class.
/// </summary>
/// <param name="extension">The extension for the underlying file.</param>
public JsonStreamReader(string extension = JsonStoreBase.DefaultExtension)
{
this.extension = extension;
}
/// <inheritdoc />
public IEnumerable<IStreamMetadata> AvailableStreams => this.Reader?.AvailableStreams;
/// <inheritdoc />
public string Name => this.Reader?.Name;
/// <inheritdoc />
public string Path => this.Reader?.Path;
/// <inheritdoc />
public TimeInterval MessageOriginatingTimeInterval
{
get
{
TimeInterval timeInterval = TimeInterval.Empty;
foreach (var metadata in this.AvailableStreams)
{
var metadataTimeInterval = new TimeInterval(metadata.FirstMessageOriginatingTime, metadata.LastMessageOriginatingTime);
timeInterval = TimeInterval.Coverage(new TimeInterval[] { timeInterval, metadataTimeInterval });
}
return timeInterval;
}
}
/// <inheritdoc />
public TimeInterval MessageCreationTimeInterval
{
get
{
TimeInterval timeInterval = TimeInterval.Empty;
foreach (var metadata in this.AvailableStreams)
{
var metadataTimeInterval = new TimeInterval(metadata.FirstMessageCreationTime, metadata.LastMessageCreationTime);
timeInterval = TimeInterval.Coverage(new TimeInterval[] { timeInterval, metadataTimeInterval });
}
return timeInterval;
}
}
/// <summary>
/// Gets or sets the underlying store reader.
/// </summary>
protected JsonStoreReader Reader { get; set; }
/// <summary>
/// Closes all open streams.
/// </summary>
public void CloseAllStreams()
{
this.outputs.Clear();
this.Reader.CloseAllStreams();
}
/// <inheritdoc />
public void Dispose()
{
this.Reader?.Dispose();
}
/// <inheritdoc />
public virtual IStreamReader OpenNew()
{
return new JsonStreamReader(this);
}
/// <inheritdoc />
public IStreamMetadata OpenStream<T>(string streamName, Action<T, Envelope> target, Func<T> allocator = null, Action<SerializationException> errorHandler = null)
{
if (string.IsNullOrWhiteSpace(streamName))
{
throw new ArgumentNullException(nameof(streamName));
}
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
if (allocator != null)
{
throw new ArgumentException("Allocators are not used by JsonStreamReader and must be null.", nameof(allocator));
}
var metadata = this.Reader.OpenStream(streamName);
if (this.outputs.ContainsKey(metadata.Id))
{
throw new ArgumentException($"Stream named '{streamName}' was already opened and can only be opened once.", nameof(streamName));
}
this.outputs[metadata.Id] = (token, envelope) => target(token.ToObject<T>(), envelope);
return metadata;
}
/// <inheritdoc />
public IStreamMetadata OpenStreamIndex<T>(string streamName, Action<Func<IStreamReader, T>, Envelope> target)
{
throw new NotImplementedException("JsonStreamReader does not support indexing.");
}
/// <inheritdoc />
public void ReadAll(ReplayDescriptor descriptor, CancellationToken cancelationToken = default(CancellationToken))
{
bool hasMoreData = true;
Envelope envelope;
this.Reader.Seek(descriptor);
while (hasMoreData)
{
if (cancelationToken.IsCancellationRequested)
{
return;
}
hasMoreData = this.Reader.MoveNext(out envelope);
if (hasMoreData)
{
JToken token;
hasMoreData = this.Reader.Read(out token);
this.outputs[envelope.SourceId](token, envelope);
}
}
}
/// <inheritdoc />
public void Seek(TimeInterval interval, bool useOriginatingTime = false)
{
throw new NotImplementedException("JsonStreamReader does not support seeking.");
}
/// <inheritdoc />
public bool MoveNext(out Envelope envelope)
{
throw new NotImplementedException("JsonStreamReader does not support stream-style access.");
}
/// <inheritdoc />
public bool IsLive()
{
throw new NotImplementedException("JsonStreamReader does not support stream-style access.");
}
/// <inheritdoc />
public IStreamMetadata GetStreamMetadata(string name)
{
throw new NotImplementedException("JsonStreamReader does not support metadata.");
}
/// <inheritdoc />
public T GetSupplementalMetadata<T>(string streamName)
{
throw new NotImplementedException("JsonStreamReader does not support supplemental metadata.");
}
/// <inheritdoc />
public bool ContainsStream(string name)
{
throw new NotImplementedException("JsonStreamReader does not support this API.");
}
}
}