Skip to content

Commit

Permalink
Fix #1106 and #1105
Browse files Browse the repository at this point in the history
  • Loading branch information
karb0f0s committed Jan 10, 2024
1 parent c3c493c commit 1d2bc54
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 51 deletions.
8 changes: 4 additions & 4 deletions src/Telegram.Bot/Polling/Abstractions/IUpdateHandler.cs
@@ -1,4 +1,4 @@
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

Expand All @@ -20,7 +20,7 @@ public interface IUpdateHandler
/// </param>
/// <param name="update">The <see cref="Update"/> to handle</param>
/// <param name="cancellationToken">
/// The <see cref="CancellationToken"/> which will notify that method execution should be cancelled
/// The <see cref="CancellationToken"/> which will notify that method execution should be canceled
/// </param>
/// <returns></returns>
Task HandleUpdateAsync(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken);
Expand All @@ -33,10 +33,10 @@ public interface IUpdateHandler
/// </param>
/// <param name="exception">The <see cref="Exception"/> to handle</param>
/// <param name="cancellationToken">
/// The <see cref="CancellationToken"/> which will notify that method execution should be cancelled
/// The <see cref="CancellationToken"/> which will notify that method execution should be canceled
/// </param>
/// <returns></returns>
Task HandlePollingErrorAsync(
Task HandleErrorAsync(
ITelegramBotClient botClient,
Exception exception,
CancellationToken cancellationToken
Expand Down
14 changes: 7 additions & 7 deletions src/Telegram.Bot/Polling/DefaultUpdateHandler.cs
@@ -1,4 +1,4 @@
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

Expand All @@ -11,19 +11,19 @@ namespace Telegram.Bot.Polling;
public class DefaultUpdateHandler : IUpdateHandler
{
readonly Func<ITelegramBotClient, Update, CancellationToken, Task> _updateHandler;
readonly Func<ITelegramBotClient, Exception, CancellationToken, Task> _pollingErrorHandler;
readonly Func<ITelegramBotClient, Exception, CancellationToken, Task> _errorHandler;

/// <summary>
/// Constructs a new <see cref="DefaultUpdateHandler"/> with the specified callback functions
/// </summary>
/// <param name="updateHandler">The function to invoke when an update is received</param>
/// <param name="pollingErrorHandler">The function to invoke when an error occurs</param>
/// <param name="errorHandler">The function to invoke when an error occurs</param>
public DefaultUpdateHandler(
Func<ITelegramBotClient, Update, CancellationToken, Task> updateHandler,
Func<ITelegramBotClient, Exception, CancellationToken, Task> pollingErrorHandler)
Func<ITelegramBotClient, Exception, CancellationToken, Task> errorHandler)
{
_updateHandler = updateHandler ?? throw new ArgumentNullException(nameof(updateHandler));
_pollingErrorHandler = pollingErrorHandler ?? throw new ArgumentNullException(nameof(pollingErrorHandler));
_errorHandler = errorHandler ?? throw new ArgumentNullException(nameof(errorHandler));
}

/// <inheritdoc />
Expand All @@ -35,10 +35,10 @@ CancellationToken cancellationToken
await _updateHandler(botClient, update, cancellationToken).ConfigureAwait(false);

/// <inheritdoc />
public async Task HandlePollingErrorAsync(
public async Task HandleErrorAsync(
ITelegramBotClient botClient,
Exception exception,
CancellationToken cancellationToken
) =>
await _pollingErrorHandler(botClient, exception, cancellationToken).ConfigureAwait(false);
await _errorHandler(botClient, exception, cancellationToken).ConfigureAwait(false);
}
77 changes: 53 additions & 24 deletions src/Telegram.Bot/Polling/DefaultUpdateReceiver.cs
@@ -1,3 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand Down Expand Up @@ -41,12 +44,13 @@ public class DefaultUpdateReceiver : IUpdateReceiver
var limit = _receiverOptions?.Limit ?? default;
var messageOffset = _receiverOptions?.Offset ?? 0;
var emptyUpdates = EmptyUpdates;
var runningTasks = new List<Task>();

if (_receiverOptions?.ThrowPendingUpdates is true)
{
try
{
messageOffset = await _botClient.ThrowOutPendingUpdatesAsync(
messageOffset = await _botClient.DiscardPendingUpdatesAsync(
cancellationToken: cancellationToken
).ConfigureAwait(false);
}
Expand All @@ -58,7 +62,7 @@ public class DefaultUpdateReceiver : IUpdateReceiver

while (!cancellationToken.IsCancellationRequested)
{
var timeout = (int) _botClient.Timeout.TotalSeconds;
var timeout = (int)_botClient.Timeout.TotalSeconds;
var updates = emptyUpdates;
try
{
Expand All @@ -71,49 +75,74 @@ public class DefaultUpdateReceiver : IUpdateReceiver
};
updates = await _botClient.MakeRequestAsync(
request: request,
cancellationToken:
cancellationToken
).ConfigureAwait(false);
cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Ignore
}
#pragma warning disable CA1031
catch (Exception exception)
#pragma warning restore CA1031
{
try
{
await updateHandler.HandlePollingErrorAsync(
await updateHandler.HandleErrorAsync(
botClient: _botClient,
exception: exception,
cancellationToken: cancellationToken
).ConfigureAwait(false);
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// ignored
}

// Cooldown on network error
if (exception is HttpRequestException)
await Task.Delay(500, cancellationToken).ConfigureAwait(false);
}

foreach (var update in updates)
{
try
{
await updateHandler.HandleUpdateAsync(
botClient: _botClient,
update: update,
cancellationToken: cancellationToken
).ConfigureAwait(false);

messageOffset = update.Id + 1;
}
catch (OperationCanceledException)
{
// ignored
}
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
runningTasks.Add(SafeInvoke(updateHandler, update, cts.Token));
}

messageOffset = updates.Length > 0
? updates[updates.Length - 1].Id + 1
: 0;
}

var faultedTasks = runningTasks.Where(t => t.Status == TaskStatus.Faulted);
if (faultedTasks.Any())
{
throw new AggregateException(faultedTasks.SelectMany(t => t.Exception!.InnerExceptions));
}
runningTasks.RemoveAll(t => t.IsCompleted);

cancellationToken.ThrowIfCancellationRequested();
}

private async Task SafeInvoke(IUpdateHandler updateHandler, Update update, CancellationToken cancellationToken)
{
try
{
await updateHandler.HandleUpdateAsync(
botClient: _botClient,
update: update,
cancellationToken: cancellationToken
).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// ignored
}
catch (Exception exception)
{
await updateHandler.HandleErrorAsync(
botClient: _botClient,
exception: exception,
cancellationToken: cancellationToken
).ConfigureAwait(false);
}
}
}
2 changes: 1 addition & 1 deletion src/Telegram.Bot/Polling/TelegramBotClientExtensions.cs
Expand Up @@ -15,7 +15,7 @@ internal static class TelegramBotClientExtensions
/// <returns>
/// Update ID of the last <see cref="Update"/> increased by 1 if there were any
/// </returns>
internal static async Task<int> ThrowOutPendingUpdatesAsync(
internal static async Task<int> DiscardPendingUpdatesAsync(
this ITelegramBotClient botClient,
CancellationToken cancellationToken = default)
{
Expand Down
30 changes: 15 additions & 15 deletions src/Telegram.Bot/TelegramBotClientExtensions.Polling.cs
Expand Up @@ -49,23 +49,23 @@ public static partial class TelegramBotClientExtensions
/// </summary>
/// <param name="botClient">The <see cref="ITelegramBotClient"/> used for making GetUpdates calls</param>
/// <param name="updateHandler">Delegate used for processing <see cref="Update"/>s</param>
/// <param name="pollingErrorHandler">Delegate used for processing polling errors</param>
/// <param name="errorHandler">Delegate used for processing errors</param>
/// <param name="receiverOptions">Options used to configure getUpdates request</param>
/// <param name="cancellationToken">
/// The <see cref="CancellationToken"/> with which you can stop receiving
/// </param>
public static void StartReceiving(
this ITelegramBotClient botClient,
Func<ITelegramBotClient, Update, CancellationToken, Task> updateHandler,
Func<ITelegramBotClient, Exception, CancellationToken, Task> pollingErrorHandler,
Func<ITelegramBotClient, Exception, CancellationToken, Task> errorHandler,
ReceiverOptions? receiverOptions = default,
CancellationToken cancellationToken = default
) =>
StartReceiving(
botClient: botClient,
updateHandler: new DefaultUpdateHandler(
updateHandler: updateHandler,
pollingErrorHandler: pollingErrorHandler
errorHandler: errorHandler
),
receiverOptions: receiverOptions,
cancellationToken: cancellationToken
Expand All @@ -80,15 +80,15 @@ public static partial class TelegramBotClientExtensions
/// </summary>
/// <param name="botClient">The <see cref="ITelegramBotClient"/> used for making GetUpdates calls</param>
/// <param name="updateHandler">Delegate used for processing <see cref="Update"/>s</param>
/// <param name="pollingErrorHandler">Delegate used for processing polling errors</param>
/// <param name="errorHandler">Delegate used for processing errors</param>
/// <param name="receiverOptions">Options used to configure getUpdates request</param>
/// <param name="cancellationToken">
/// The <see cref="CancellationToken"/> with which you can stop receiving
/// </param>
public static void StartReceiving(
this ITelegramBotClient botClient,
Action<ITelegramBotClient, Update, CancellationToken> updateHandler,
Action<ITelegramBotClient, Exception, CancellationToken> pollingErrorHandler,
Action<ITelegramBotClient, Exception, CancellationToken> errorHandler,
ReceiverOptions? receiverOptions = default,
CancellationToken cancellationToken = default
) =>
Expand All @@ -100,9 +100,9 @@ public static partial class TelegramBotClientExtensions
updateHandler.Invoke(bot, update, token);
return Task.CompletedTask;
},
pollingErrorHandler: (bot, exception, token) =>
errorHandler: (bot, exception, token) =>
{
pollingErrorHandler.Invoke(bot, exception, token);
errorHandler.Invoke(bot, exception, token);
return Task.CompletedTask;
}
),
Expand Down Expand Up @@ -157,7 +157,7 @@ public static partial class TelegramBotClientExtensions
{
try
{
await updateHandler.HandlePollingErrorAsync(
await updateHandler.HandleErrorAsync(
botClient: botClient,
exception: ex,
cancellationToken: cancellationToken
Expand Down Expand Up @@ -213,7 +213,7 @@ public static partial class TelegramBotClientExtensions
/// </summary>
/// <param name="botClient">The <see cref="ITelegramBotClient"/> used for making GetUpdates calls</param>
/// <param name="updateHandler">Delegate used for processing <see cref="Update"/>s</param>
/// <param name="pollingErrorHandler">Delegate used for processing polling errors</param>
/// <param name="errorHandler">Delegate used for processing errors</param>
/// <param name="receiverOptions">Options used to configure getUpdates requests</param>
/// <param name="cancellationToken">
/// The <see cref="CancellationToken"/> with which you can stop receiving
Expand All @@ -225,15 +225,15 @@ public static partial class TelegramBotClientExtensions
public static async Task ReceiveAsync(
this ITelegramBotClient botClient,
Func<ITelegramBotClient, Update, CancellationToken, Task> updateHandler,
Func<ITelegramBotClient, Exception, CancellationToken, Task> pollingErrorHandler,
Func<ITelegramBotClient, Exception, CancellationToken, Task> errorHandler,
ReceiverOptions? receiverOptions = default,
CancellationToken cancellationToken = default
) =>
await ReceiveAsync(
botClient: botClient,
updateHandler: new DefaultUpdateHandler(
updateHandler: updateHandler,
pollingErrorHandler: pollingErrorHandler
errorHandler: errorHandler
),
receiverOptions: receiverOptions,
cancellationToken: cancellationToken
Expand All @@ -249,7 +249,7 @@ public static partial class TelegramBotClientExtensions
/// </summary>
/// <param name="botClient">The <see cref="ITelegramBotClient"/> used for making GetUpdates calls</param>
/// <param name="updateHandler">Delegate used for processing <see cref="Update"/>s</param>
/// <param name="pollingErrorHandler">Delegate used for processing polling errors</param>
/// <param name="errorHandler">Delegate used for processing errors</param>
/// <param name="receiverOptions">Options used to configure getUpdates requests</param>
/// <param name="cancellationToken">
/// The <see cref="CancellationToken"/> with which you can stop receiving
Expand All @@ -261,7 +261,7 @@ public static partial class TelegramBotClientExtensions
public static async Task ReceiveAsync(
this ITelegramBotClient botClient,
Action<ITelegramBotClient, Update, CancellationToken> updateHandler,
Action<ITelegramBotClient, Exception, CancellationToken> pollingErrorHandler,
Action<ITelegramBotClient, Exception, CancellationToken> errorHandler,
ReceiverOptions? receiverOptions = default,
CancellationToken cancellationToken = default
) =>
Expand All @@ -273,9 +273,9 @@ public static partial class TelegramBotClientExtensions
updateHandler.Invoke(bot, update, token);
return Task.CompletedTask;
},
pollingErrorHandler: (bot, exception, token) =>
errorHandler: (bot, exception, token) =>
{
pollingErrorHandler.Invoke(bot, exception, token);
errorHandler.Invoke(bot, exception, token);
return Task.CompletedTask;
}
),
Expand Down

0 comments on commit 1d2bc54

Please sign in to comment.