Skip to content

Commit

Permalink
Implement SubscriptionFailure Event and Enhance EnumeratorWrapper Exc…
Browse files Browse the repository at this point in the history
…eption Message (#5)

* feat: handle event error message from EventSourceCollection

* feat: send error message through chain of classes

* refactor: Separate tests for valid and invalid data test parameters

* feat: change access modifier in several variable

* test: test case with wrong api key
refactor: DQH tests
  • Loading branch information
Romazes committed Feb 9, 2024
1 parent 613cd4a commit f8115d9
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 52 deletions.
26 changes: 18 additions & 8 deletions QuantConnect.IEX.Tests/IEXDataDownloaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,31 @@ public void SetUp()
_downloader = new IEXDataDownloader();
}

private static IEnumerable<TestCaseData> HistoricalDataTestCases => IEXDataHistoryTests.TestParameters;
private static IEnumerable<TestCaseData> HistoricalValidDataTestCases => IEXDataHistoryTests.ValidDataTestParameters;
private static IEnumerable<TestCaseData> HistoricalInvalidDataTestCases => IEXDataHistoryTests.InvalidDataTestParameters;

[TestCaseSource(nameof(HistoricalDataTestCases))]
public void DownloadsHistoricalData(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period, bool isEmptyResult)

[TestCaseSource(nameof(HistoricalInvalidDataTestCases))]
public void DownloadsHistoricalDataWithInvalidDataTestParameters(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period)
{
var request = IEXDataHistoryTests.CreateHistoryRequest(symbol, resolution, tickType, period);

var parameters = new DataDownloaderGetParameters(symbol, resolution, request.StartTimeUtc, request.EndTimeUtc, tickType);

var downloadResponse = _downloader.Get(parameters).ToList();

if (!isEmptyResult)
{
Assert.IsEmpty(downloadResponse);
return;
}
Assert.IsEmpty(downloadResponse);
}

[Explicit("This tests require a iexcloud.io api key")]
[TestCaseSource(nameof(HistoricalValidDataTestCases))]
public void DownloadsHistoricalDataWithValidDataTestParameters(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period)
{
var request = IEXDataHistoryTests.CreateHistoryRequest(symbol, resolution, tickType, period);

var parameters = new DataDownloaderGetParameters(symbol, resolution, request.StartTimeUtc, request.EndTimeUtc, tickType);

var downloadResponse = _downloader.Get(parameters).ToList();

Assert.IsNotEmpty(downloadResponse);

Expand All @@ -57,6 +66,7 @@ public void DownloadsHistoricalData(Symbol symbol, Resolution resolution, TickTy

private static IEnumerable<TestCaseData> SymbolDaysBeforeCaseData => IEXDataHistoryTests.SymbolDaysBeforeCaseData;

[Explicit("This tests require a iexcloud.io api key")]
[Test, TestCaseSource(nameof(SymbolDaysBeforeCaseData))]
public void DownloadsHistoricalDataDailyForYears(Symbol symbol, int amountDaysBefore)
{
Expand Down
52 changes: 32 additions & 20 deletions QuantConnect.IEX.Tests/IEXDataHistoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,47 +52,53 @@ public void TearDown()
/// <remarks>
/// The test parameters include valid and invalid combinations of input data.
/// </remarks>
internal static IEnumerable<TestCaseData> TestParameters
internal static IEnumerable<TestCaseData> ValidDataTestParameters
{
get
{
// Valid parameters
yield return new TestCaseData(Symbols.SPY, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(15), true)
yield return new TestCaseData(Symbols.SPY, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(15))
.SetDescription("Valid parameters - Daily resolution, 15 days period.")
.SetCategory("Valid");

yield return new TestCaseData(Symbols.SPY, Resolution.Minute, TickType.Trade, TimeSpan.FromDays(5), true)
yield return new TestCaseData(Symbols.SPY, Resolution.Minute, TickType.Trade, TimeSpan.FromDays(5))
.SetDescription("Valid parameters - Minute resolution, 5 days period.")
.SetCategory("Valid");

yield return new TestCaseData(Symbols.SPY, Resolution.Minute, TickType.Trade, TimeSpan.FromDays(45), true)
.SetDescription("Valid parameters - Beyond 45 days, Minute resolution.")
.SetCategory("Valid");
}
}

internal static IEnumerable<TestCaseData> InvalidDataTestParameters
{
get
{
// Invalid resolution - empty result
yield return new TestCaseData(Symbols.SPY, Resolution.Tick, TickType.Trade, TimeSpan.FromSeconds(15), false)
yield return new TestCaseData(Symbols.SPY, Resolution.Tick, TickType.Trade, TimeSpan.FromSeconds(15))
.SetDescription("Invalid resolution - Tick resolution, 15 seconds period.")
.SetCategory("Invalid");

yield return new TestCaseData(Symbols.SPY, Resolution.Second, TickType.Trade, Time.OneMinute, false)
yield return new TestCaseData(Symbols.SPY, Resolution.Second, TickType.Trade, Time.OneMinute)
.SetDescription("Invalid resolution - Second resolution, 1 minute period.")
.SetCategory("Invalid");

yield return new TestCaseData(Symbols.SPY, Resolution.Hour, TickType.Trade, Time.OneDay, false)
yield return new TestCaseData(Symbols.SPY, Resolution.Hour, TickType.Trade, Time.OneDay)
.SetDescription("Invalid resolution - Hour resolution, 1 day period.")
.SetCategory("Invalid");

yield return new TestCaseData(Symbols.SPY, Resolution.Minute, TickType.Trade, TimeSpan.FromDays(45), true)
.SetDescription("Valid parameters - Beyond 45 days, Minute resolution.")
.SetCategory("Valid");

yield return new TestCaseData(Symbols.SPY, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(-15), false)
yield return new TestCaseData(Symbols.SPY, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(-15))
.SetDescription("Invalid period - Date in the future, Daily resolution.")
.SetCategory("Invalid");

// Invalid data type - empty result
yield return new TestCaseData(Symbols.SPY, Resolution.Daily, TickType.Quote, TimeSpan.FromDays(15), false)
yield return new TestCaseData(Symbols.SPY, Resolution.Daily, TickType.Quote, TimeSpan.FromDays(15))
.SetDescription("Invalid data type - Daily resolution, QuoteBar data type.")
.SetCategory("Invalid");

// Invalid security type, no exception, empty result
yield return new TestCaseData(Symbols.EURUSD, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(15), false)
yield return new TestCaseData(Symbols.EURUSD, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(15))
.SetDescription("Invalid security type - EURUSD symbol, Daily resolution.")
.SetCategory("Invalid");
}
Expand All @@ -115,6 +121,7 @@ internal static IEnumerable<TestCaseData> SymbolDaysBeforeCaseData
}
}

[Explicit("This tests require a iexcloud.io api key")]
[Test, TestCaseSource(nameof(SymbolDaysBeforeCaseData))]
public void IEXCloudGetHistoryDailyForYears(Symbol symbol, int amountDaysBefore)
{
Expand All @@ -124,16 +131,20 @@ public void IEXCloudGetHistoryDailyForYears(Symbol symbol, int amountDaysBefore)
Assert.Greater(slices.Count, 1);
}

[Test, TestCaseSource(nameof(TestParameters))]
public void IEXCouldGetHistory(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period, bool received)
[Test, TestCaseSource(nameof(InvalidDataTestParameters))]
public void IEXCloudGetHistoryWithInvalidDataTestParameters(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period)
{
var slices = GetHistory(symbol, resolution, tickType, period);

if (!received)
{
Assert.IsEmpty(slices);
return;
}
Assert.IsEmpty(slices);
return;
}

[Explicit("This tests require a iexcloud.io api key")]
[Test, TestCaseSource(nameof(ValidDataTestParameters))]
public void IEXCloudGetHistoryWithValidDataTestParameters(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period)
{
var slices = GetHistory(symbol, resolution, tickType, period);

Assert.IsNotEmpty(slices);

Expand Down Expand Up @@ -189,6 +200,7 @@ public void GetHistoryInvalidSymbolThrowException(Symbol symbol, Resolution reso
Assert.Throws<ArgumentException>(() => GetHistory(symbol, resolution, tickType, period));
}

[Explicit("This tests require a iexcloud.io api key")]
[TestCase(10)]
[TestCase(20)]
public void GetHistoryReturnsValidDataForMultipleConcurrentRequests(int amountOfTask)
Expand Down
88 changes: 68 additions & 20 deletions QuantConnect.IEX.Tests/IEXDataQueueHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
using Microsoft.CodeAnalysis;
using System.Threading.Tasks;
using QuantConnect.Data.Market;
using QuantConnect.Configuration;
using System.Collections.Generic;
using System.Collections.Concurrent;
using QuantConnect.Data.UniverseSelection;

namespace QuantConnect.IEX.Tests
{
[TestFixture]
[TestFixture, Explicit("This tests require a iexcloud.io api key")]
public class IEXDataQueueHandlerTests
{
private CancellationTokenSource _cancellationTokenSource;
private IEXDataQueueHandler iexDataQueueHandler;

private static readonly string[] HardCodedSymbolsSNP = {
Expand Down Expand Up @@ -73,6 +75,7 @@ public class IEXDataQueueHandlerTests
[SetUp]
public void SetUp()
{
_cancellationTokenSource = new();
iexDataQueueHandler = new IEXDataQueueHandler();
}

Expand All @@ -83,6 +86,8 @@ public void TearDown()
{
iexDataQueueHandler.Dispose();
}

_cancellationTokenSource.Dispose();
}

protected static IEnumerable<TestCaseData> TestCaseDataSymbolsConfigs
Expand Down Expand Up @@ -142,25 +147,25 @@ public void IEXCouldSubscribeManyTimes(Queue<SubscriptionDataConfig> configs)
ticks.Add(tick);
}
});
},
() => _cancellationTokenSource.Cancel());
}

Thread.Sleep(20_000);
Assert.IsFalse(_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20)), "The cancellation token was cancelled block thread.");

foreach (var config in configs)
{
iexDataQueueHandler.Unsubscribe(config);
}

Thread.Sleep(20000);
_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20));

Assert.Greater(ticks.Count, 0);
}

[TestCaseSource(nameof(TestCaseDataSymbolsConfigs))]
public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue<SubscriptionDataConfig> configs)
{
var cancellationTokenSource = new CancellationTokenSource();
var resetEvent = new ManualResetEvent(false);

var tempDictionary = new ConcurrentDictionary<Symbol, int>();
Expand All @@ -186,10 +191,11 @@ public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue<SubscriptionData
resetEvent.Set();
}
}
});
},
() => _cancellationTokenSource.Cancel());
}

Assert.IsTrue(resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancellationTokenSource.Token));
Assert.IsTrue(resetEvent.WaitOne(TimeSpan.FromSeconds(30), _cancellationTokenSource.Token));

// Unsubscribe from all symbols except one. (Why 2, cuz we have subscribe on Quote And Trade)
for (int i = configs.Count; i > 2; i--)
Expand All @@ -205,7 +211,7 @@ public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue<SubscriptionData
resetEvent.Reset();
tempDictionary = new ConcurrentDictionary<Symbol, int>();

Assert.IsTrue(resetEvent.WaitOne(TimeSpan.FromSeconds(30), cancellationTokenSource.Token));
Assert.IsTrue(resetEvent.WaitOne(TimeSpan.FromSeconds(30), _cancellationTokenSource.Token));

for (int i = configs.Count; i > 0; i--)
{
Expand All @@ -214,7 +220,7 @@ public void IEXSubscribeToSeveralThenUnSubscribeExceptOne(Queue<SubscriptionData
}

Thread.Sleep(TimeSpan.FromSeconds(1));
cancellationTokenSource.Cancel();
_cancellationTokenSource.Cancel();
}

[Test]
Expand All @@ -234,6 +240,11 @@ public void IEXCouldSubscribeAndUnsubscribe()
}
};

Action throwExceptionCallback = () =>
{
_cancellationTokenSource.Cancel();
};

var configs = new[] {
GetSubscriptionDataConfig<TradeBar>(Symbol.Create("MBLY", SecurityType.Equity, Market.USA), Resolution.Second),
GetSubscriptionDataConfig<TradeBar>(Symbol.Create("USO", SecurityType.Equity, Market.USA), Resolution.Second)
Expand All @@ -243,36 +254,36 @@ public void IEXCouldSubscribeAndUnsubscribe()
{
ProcessFeed(
iexDataQueueHandler.Subscribe(c, (s, e) => { }),
callback);
callback,
throwExceptionCallback);
});

Thread.Sleep(20000);
Assert.IsFalse(_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20)), "The cancellation token was cancelled block thread.");

iexDataQueueHandler.Unsubscribe(Enumerable.First(configs, c => string.Equals(c.Symbol.Value, "MBLY")));

Log.Trace("Unsubscribing");
Thread.Sleep(2000);
_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(2));
// some messages could be inflight, but after a pause all MBLY messages must have beed consumed by ProcessFeed
unsubscribed = true;

Thread.Sleep(20000);
_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20));
}

[Test, Explicit("Tests are dependent on network and are long")]
[Test]
public void IEXCouldSubscribeMoreThan100Symbols()
{
var cancellationTokenSource = new CancellationTokenSource();
var resetEvent = new AutoResetEvent(false);

foreach (var ticker in HardCodedSymbolsSNP.Take(250))
{
foreach (var config in GetSubscriptionDataConfigs(ticker, Resolution.Second))
{
iexDataQueueHandler.Subscribe(config, (s, e) => { });
ProcessFeed(iexDataQueueHandler.Subscribe(config, (s, e) => { }), throwExceptionCallback: () => _cancellationTokenSource.Cancel());
}
}

resetEvent.WaitOne(TimeSpan.FromMinutes(2), cancellationTokenSource.Token);
Assert.IsFalse(resetEvent.WaitOne(TimeSpan.FromMinutes(2), _cancellationTokenSource.Token), "The cancellation token was cancelled block thread.");
Assert.IsTrue(iexDataQueueHandler.IsConnected);
}

Expand Down Expand Up @@ -319,7 +330,37 @@ public void NotSubscribeOnCanonicalSymbol()
}
}

private void ProcessFeed(IEnumerator<BaseData> enumerator, Action<BaseData> callback = null)
[Test]
public void SubscribeWithWrongApiKeyThrowException()
{
Config.Set("iex-cloud-api-key", "wrong-api-key");

var isSubscribeThrowException = false;

var iexDataQueueHandler = new IEXDataQueueHandler();

foreach (var config in GetSubscriptionDataConfigs(Symbols.SPY, Resolution.Second))
{
ProcessFeed(
iexDataQueueHandler.Subscribe(config, (s, e) => { }),
tick =>
{
if (tick != null)
{
Log.Debug($"{nameof(IEXDataQueueHandlerTests)}: tick: {tick}");
}
},
() => {
isSubscribeThrowException = true;
_cancellationTokenSource.Cancel();
});
}

_cancellationTokenSource.Token.WaitHandle.WaitOne(TimeSpan.FromSeconds(20));
Assert.IsTrue(isSubscribeThrowException);
}

private void ProcessFeed(IEnumerator<BaseData> enumerator, Action<BaseData> callback = null, Action throwExceptionCallback = null)
{
Task.Factory.StartNew(() =>
{
Expand All @@ -339,9 +380,16 @@ private void ProcessFeed(IEnumerator<BaseData> enumerator, Action<BaseData> call
}
catch (Exception err)
{
Log.Error(err.Message);
throw;
}
});
}).ContinueWith(task =>
{
if (throwExceptionCallback != null)
{
throwExceptionCallback();
}
Log.Error("The throwExceptionCallback is null.");
}, TaskContinuationOptions.OnlyOnFaulted);
}

private static IEnumerable<SubscriptionDataConfig> GetSubscriptionDataConfigs(string ticker, Resolution resolution)
Expand Down

0 comments on commit f8115d9

Please sign in to comment.