Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reopen secure channel without activate #2577

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions Applications/ConsoleReferenceClient/ClientSamples.cs
Expand Up @@ -699,7 +699,7 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime)
/// Outputs elapsed time information for perf testing and lists all
/// types that were successfully added to the session encodeable type factory.
/// </remarks>
public async Task LoadTypeSystemAsync(ISession session)
public async Task<ComplexTypeSystem> LoadTypeSystemAsync(ISession session)
{
m_output.WriteLine("Load the server type system.");

Expand Down Expand Up @@ -732,6 +732,8 @@ public async Task LoadTypeSystemAsync(ISession session)
}
}
}

return complexTypeSystem;
}
#endregion

Expand Down Expand Up @@ -900,7 +902,7 @@ Task FetchReferenceIdTypesAsync(ISession session)
StartNodeId = item.NodeId,
AttributeId = Attributes.Value,
SamplingInterval = samplingInterval,
DisplayName = item.DisplayName?.Text ?? item.BrowseName.Name,
DisplayName = item.DisplayName?.Text ?? item.BrowseName?.Name ?? "unknown",
QueueSize = queueSize,
DiscardOldest = true,
MonitoringMode = MonitoringMode.Reporting,
Expand Down
40 changes: 33 additions & 7 deletions Applications/ConsoleReferenceClient/Program.cs
Expand Up @@ -234,7 +234,7 @@ public static async Task Main(string[] args)
var samples = new ClientSamples(output, ClientBase.ValidateResponse, quitEvent, verbose);
if (loadTypes)
{
await samples.LoadTypeSystemAsync(uaClient.Session).ConfigureAwait(false);
var complexTypeSystem = await samples.LoadTypeSystemAsync(uaClient.Session).ConfigureAwait(false);
}

if (browseall || fetchall || jsonvalues)
Expand Down Expand Up @@ -266,8 +266,8 @@ public static async Task Main(string[] args)

if (subscribe && (browseall || fetchall))
{
// subscribe to 100 random variables
const int MaxVariables = 100;
// subscribe to 1000 random variables
const int MaxVariables = 1000;
NodeCollection variables = new NodeCollection();
Random random = new Random(62541);
if (fetchall)
Expand All @@ -291,15 +291,41 @@ public static async Task Main(string[] args)

await samples.SubscribeAllValuesAsync(uaClient,
variableIds: new NodeCollection(variables),
samplingInterval: 1000,
publishingInterval: 5000,
samplingInterval: 100,
publishingInterval: 1000,
queueSize: 10,
lifetimeCount: 12,
lifetimeCount: 60,
keepAliveCount: 2).ConfigureAwait(false);

// Wait for DataChange notifications from MonitoredItems
output.WriteLine("Subscribed to {0} variables. Press Ctrl-C to exit.", MaxVariables);
quit = quitEvent.WaitOne(timeout > 0 ? waitTime : Timeout.Infinite);

// free unused memory
uaClient.Session.NodeCache.Clear();

waitTime = timeout - (int)DateTime.UtcNow.Subtract(start).TotalMilliseconds;
DateTime endTime = waitTime > 0 ? DateTime.UtcNow.Add(TimeSpan.FromMilliseconds(waitTime)) : DateTime.MaxValue;
var variableIterator = variables.GetEnumerator();
while (!quit && endTime > DateTime.UtcNow)
{
if (variableIterator.MoveNext())
{
try
{
var value = await uaClient.Session.ReadValueAsync(variableIterator.Current.NodeId).ConfigureAwait(false);
output.WriteLine("Value of {0} is {1}", variableIterator.Current.NodeId, value);
}
catch (Exception ex)
{
output.WriteLine("Error reading value of {0}: {1}", variableIterator.Current.NodeId, ex.Message);
}
}
else
{
variableIterator = variables.GetEnumerator();
}
quit = quitEvent.WaitOne(500);
}
}
else
{
Expand Down
114 changes: 80 additions & 34 deletions Libraries/Opc.Ua.Client/Session.cs
Expand Up @@ -27,6 +27,10 @@
* http://opcfoundation.org/License/MIT/1.00/
* ======================================================================*/

#if NET6_0_OR_GREATER
#define PERIODIC_TIMER
#endif

using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -40,6 +44,7 @@
using System.Threading.Tasks;
using System.Xml;
using Microsoft.Extensions.Logging;
using Opc.Ua.Bindings;

namespace Opc.Ua.Client
{
Expand Down Expand Up @@ -741,17 +746,25 @@
/// Returns true if the session is not receiving keep alives.
/// </summary>
/// <remarks>
/// Set to true if the server does not respond for 2 times the KeepAliveInterval.
/// Set to false is communication recovers.
/// Set to true if the server does not respond for 2 times the KeepAliveInterval
/// or if another error was reported.
/// Set to false is communication is ok or recovered.
/// </remarks>
public bool KeepAliveStopped
{
get
{
TimeSpan delta = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastKeepAliveTime));
StatusCode lastKeepAliveErrorStatusCode = m_lastKeepAliveErrorStatusCode;
if (StatusCode.IsGood(lastKeepAliveErrorStatusCode) || lastKeepAliveErrorStatusCode == StatusCodes.BadNoCommunication)
{
TimeSpan delta = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastKeepAliveTime));

// add a guard band to allow for network lag.
return (m_keepAliveInterval + kKeepAliveGuardBand) <= delta.TotalMilliseconds;
// add a guard band to allow for network lag.
return (m_keepAliveInterval + kKeepAliveGuardBand) <= delta.TotalMilliseconds;
}

// another error was reported which caused keep alive to stop.
return true;

Check warning on line 767 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L767

Added line #L767 was not covered by tests
}
}

Expand Down Expand Up @@ -1429,7 +1442,10 @@

if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2))
{
Utils.LogWarning("WARNING: ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount);
var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, "ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount);
Utils.LogWarning("WARNING: {0}", error.ToString());
var operation = result as ChannelAsyncOperation<int>;

Check warning on line 1447 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L1445-L1447

Added lines #L1445 - L1447 were not covered by tests
operation?.Fault(false, error);
}

// reactivate session.
Expand Down Expand Up @@ -3690,6 +3706,7 @@
{
int keepAliveInterval = m_keepAliveInterval;

m_lastKeepAliveErrorStatusCode = StatusCodes.Good;
Interlocked.Exchange(ref m_lastKeepAliveTime, DateTime.UtcNow.Ticks);

m_serverState = ServerState.Unknown;
Expand All @@ -3709,7 +3726,7 @@
{
StopKeepAliveTimer();

#if NET6_0_OR_GREATER
#if PERIODIC_TIMER
// start periodic timer loop
var keepAliveTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(keepAliveInterval));
_ = Task.Run(() => OnKeepAliveAsync(keepAliveTimer, nodesToRead));
Expand Down Expand Up @@ -3821,7 +3838,7 @@
}
}

#if NET6_0_OR_GREATER
#if PERIODIC_TIMER
/// <summary>
/// Sends a keep alive by reading from the server.
/// </summary>
Expand Down Expand Up @@ -3893,6 +3910,11 @@

AsyncRequestStarted(result, requestHeader.RequestHandle, DataTypes.ReadRequest);
}
catch (ServiceResultException sre) when (sre.StatusCode == StatusCodes.BadNotConnected)
{
// recover from error condition when secure channel is still alive
OnKeepAliveError(sre.Result);
}

Check warning on line 3917 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L3916-L3917

Added lines #L3916 - L3917 were not covered by tests
catch (Exception e)
{
Utils.LogError("Could not send keep alive request: {0} {1}", e.GetType().FullName, e.Message);
Expand Down Expand Up @@ -3935,10 +3957,10 @@

return;
}
catch (ServiceResultException sre) when (sre.StatusCode == StatusCodes.BadSessionIdInvalid)
catch (ServiceResultException sre)
{
// recover from error condition when secure channel is still alive
OnKeepAliveError(ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session unavailable for keep alive requests."));
OnKeepAliveError(sre.Result);
}
catch (Exception e)
{
Expand All @@ -3960,6 +3982,7 @@
return;
}

m_lastKeepAliveErrorStatusCode = StatusCodes.Good;

Check warning on line 3985 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L3985

Added line #L3985 was not covered by tests
Interlocked.Exchange(ref m_lastKeepAliveTime, DateTime.UtcNow.Ticks);

lock (m_outstandingRequests)
Expand All @@ -3977,6 +4000,7 @@
}
else
{
m_lastKeepAliveErrorStatusCode = StatusCodes.Good;
Interlocked.Exchange(ref m_lastKeepAliveTime, DateTime.UtcNow.Ticks);
}

Expand All @@ -4003,22 +4027,28 @@
/// </summary>
protected virtual bool OnKeepAliveError(ServiceResult result)
{
long delta = DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastKeepAliveTime);
DateTime now = DateTime.UtcNow;

Utils.LogInfo(
"KEEP ALIVE LATE: {0}s, EndpointUrl={1}, RequestCount={2}/{3}",
((double)delta) / TimeSpan.TicksPerSecond,
this.Endpoint?.EndpointUrl,
this.GoodPublishRequestCount,
this.OutstandingRequestCount);
m_lastKeepAliveErrorStatusCode = result.StatusCode;
if (result.StatusCode == StatusCodes.BadNoCommunication)
{
// keep alive read timed out
long delta = now.Ticks - Interlocked.Read(ref m_lastKeepAliveTime);
Utils.LogInfo(
"KEEP ALIVE LATE: {0}s, EndpointUrl={1}, RequestCount={2}/{3}",
((double)delta) / TimeSpan.TicksPerSecond,
this.Endpoint?.EndpointUrl,
this.GoodPublishRequestCount,
this.OutstandingRequestCount);
}

KeepAliveEventHandler callback = m_KeepAlive;

if (callback != null)
{
try
{
KeepAliveEventArgs args = new KeepAliveEventArgs(result, ServerState.Unknown, DateTime.UtcNow);
KeepAliveEventArgs args = new KeepAliveEventArgs(result, ServerState.Unknown, now);
callback(this, args);
return !args.CancelKeepAlive;
}
Expand Down Expand Up @@ -5074,43 +5104,40 @@

case StatusCodes.BadNoSubscription:
case StatusCodes.BadSessionClosed:
case StatusCodes.BadSessionIdInvalid:
case StatusCodes.BadSecureChannelIdInvalid:
case StatusCodes.BadSecureChannelClosed:
case StatusCodes.BadSecurityChecksFailed:
case StatusCodes.BadCertificateInvalid:
case StatusCodes.BadServerHalted:
return;

// may require a reconnect or activate to recover
case StatusCodes.BadSessionIdInvalid:
case StatusCodes.BadSecureChannelIdInvalid:
case StatusCodes.BadSecureChannelClosed:
OnKeepAliveError(error);
return;

// Servers may return this error when overloaded
case StatusCodes.BadTooManyOperations:
case StatusCodes.BadTcpServerTooBusy:
case StatusCodes.BadServerTooBusy:
// throttle the next publish to reduce server load
_ = Task.Run(async () => {
await Task.Delay(100).ConfigureAwait(false);
BeginPublish(OperationTimeout);
QueueBeginPublish();

Check warning on line 5126 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5126

Added line #L5126 was not covered by tests
});
return;

case StatusCodes.BadTimeout:
break;

default:
Utils.LogError(e, "PUBLISH #{0} - Unhandled error {1} during Publish.", requestHeader.RequestHandle, error.StatusCode);
goto case StatusCodes.BadServerTooBusy;

}
}

int requestCount = GoodPublishRequestCount;
int minPublishRequestCount = GetMinPublishRequestCount(false);

if (requestCount < minPublishRequestCount)
{
BeginPublish(OperationTimeout);
}
else
{
Utils.LogInfo("PUBLISH - Did not send another publish request. GoodPublishRequestCount={0}, MinPublishRequestCount={1}", requestCount, minPublishRequestCount);
}
QueueBeginPublish();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -5201,6 +5228,24 @@
#endregion

#region Private Methods
/// <summary>
/// Queues a publish request if there are not enough outstanding requests.
/// </summary>
private void QueueBeginPublish()
{
int requestCount = GoodPublishRequestCount;
int minPublishRequestCount = GetMinPublishRequestCount(false);

if (requestCount < minPublishRequestCount)
{
BeginPublish(OperationTimeout);
}
else
{
Utils.LogInfo("PUBLISH - Did not send another publish request. GoodPublishRequestCount={0}, MinPublishRequestCount={1}", requestCount, minPublishRequestCount);
}
}

/// <summary>
/// Validates the identity for an open call.
/// </summary>
Expand Down Expand Up @@ -6315,9 +6360,10 @@
private long m_publishCounter;
private int m_tooManyPublishRequests;
private long m_lastKeepAliveTime;
private StatusCode m_lastKeepAliveErrorStatusCode;
private ServerState m_serverState;
private int m_keepAliveInterval;
#if NET6_0_OR_GREATER
#if PERIODIC_TIMER
private PeriodicTimer m_keepAliveTimer;
#else
private Timer m_keepAliveTimer;
Expand Down
29 changes: 23 additions & 6 deletions Libraries/Opc.Ua.Client/SessionAsync.cs
Expand Up @@ -1332,6 +1332,11 @@
/// <returns>The new session object.</returns>
public static async Task<Session> RecreateAsync(Session sessionTemplate, ITransportChannel transportChannel, CancellationToken ct = default)
{
if (transportChannel == null)
{
return await Session.RecreateAsync(sessionTemplate, ct).ConfigureAwait(false);

Check warning on line 1337 in Libraries/Opc.Ua.Client/SessionAsync.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/SessionAsync.cs#L1337

Added line #L1337 was not covered by tests
}

ServiceMessageContext messageContext = sessionTemplate.m_configuration.CreateMessageContext();
messageContext.Factory = sessionTemplate.Factory;

Expand Down Expand Up @@ -1499,15 +1504,27 @@
connection,
transportChannel);

if (!(result is ChannelAsyncOperation<int> operation)) throw new ArgumentNullException(nameof(result));

try
const string timeoutMessage = "ACTIVATE SESSION ASYNC timed out. {0}/{1}";
if (result is ChannelAsyncOperation<int> operation)
{
_ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false);
try
{
_ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false);
}
catch (ServiceResultException sre)
{
if (sre.StatusCode == StatusCodes.BadRequestInterrupted)
{
var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, timeoutMessage,
GoodPublishRequestCount, OutstandingRequestCount);
Utils.LogWarning("WARNING: {0}", error.ToString());
operation.Fault(false, error);

Check warning on line 1521 in Libraries/Opc.Ua.Client/SessionAsync.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/SessionAsync.cs#L1518-L1521

Added lines #L1518 - L1521 were not covered by tests
}
}

Check warning on line 1523 in Libraries/Opc.Ua.Client/SessionAsync.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/SessionAsync.cs#L1523

Added line #L1523 was not covered by tests
}
catch (ServiceResultException)
else if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2))
{
Utils.LogWarning("WARNING: ACTIVATE SESSION {0} timed out. {1}/{2}", SessionId, GoodPublishRequestCount, OutstandingRequestCount);
Utils.LogWarning(timeoutMessage, GoodPublishRequestCount, OutstandingRequestCount);

Check warning on line 1527 in Libraries/Opc.Ua.Client/SessionAsync.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/SessionAsync.cs#L1527

Added line #L1527 was not covered by tests
}

// reactivate session.
Expand Down