Skip to content

Commit

Permalink
Merge pull request #282 from fiskaltrust/#207-DE-Fiskaly-Split-TAR-ex…
Browse files Browse the repository at this point in the history
…port

#207 de fiskaly split tar export
  • Loading branch information
forsthug committed May 16, 2024
2 parents 05dfad4 + 85974ae commit 6f31649
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 27 deletions.
161 changes: 141 additions & 20 deletions scu-de/src/fiskaltrust.Middleware.SCU.DE.FiskalyCertified/FiskalySCU.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class FiskalySCU : IDESSCD
{
private readonly ConcurrentDictionary<ulong, long> _transactionsStates;
private readonly ConcurrentDictionary<string, ExportStateData> _readStreamPointer = new ConcurrentDictionary<string, ExportStateData>();
private readonly ConcurrentDictionary<Guid, List<SplitExportStateData>> _splitExports = new ConcurrentDictionary<Guid, List<SplitExportStateData>>();

private readonly ILogger<FiskalySCU> _logger;
private readonly FiskalySCUConfiguration _configuration;
private readonly ClientCache _clientCache;
Expand Down Expand Up @@ -314,10 +316,18 @@ public async Task<StartExportSessionResponse> StartExportSessionAsync(StartExpor
var (from, to) = GetExportRange(tss);

var exportId = Guid.NewGuid();
await _fiskalyApiProvider.RequestExportAsync(_configuration.TssId, exportRequest, exportId, from, to);
await _fiskalyApiProvider.SetExportMetadataAsync(_configuration.TssId, exportId, from, to);
SetExportState(exportId, ExportState.Running);
CacheExportAsync(exportId).ExecuteInBackgroundThread();
var range = (to - from) ?? 0;
if (range > _configuration.MaxExportTransaction)
{
await StartSplitExportSessionAsync(exportRequest, exportId, from ?? 1, to);
}
else
{
await _fiskalyApiProvider.RequestExportAsync(_configuration.TssId, exportRequest, exportId, from, to);
await _fiskalyApiProvider.SetExportMetadataAsync(_configuration.TssId, exportId, from, to);
SetExportState(exportId, ExportState.Running);
CacheExportAsync(exportId).ExecuteInBackgroundThread();
}

return new StartExportSessionResponse
{
Expand All @@ -332,6 +342,42 @@ public async Task<StartExportSessionResponse> StartExportSessionAsync(StartExpor
}
}

private async Task StartSplitExportSessionAsync(ExportTransactions exportRequest, Guid exportId, long from, long to)
{
decimal temp = (to - from) / _configuration.MaxExportTransaction;
var exportCount = Math.Floor(temp) + 1;

for (var i = 1; i <= exportCount; i++)
{
var splitExport = new SplitExportStateData
{
ParentExportId = exportId,
ExportId = Guid.NewGuid(),
From = from,
To = from + _configuration.MaxExportTransaction - 1 < to ? from + _configuration.MaxExportTransaction - 1 : to,
ExportStateData = new ExportStateData
{
ReadPointer = 0,
State = ExportState.Unkown
}
};
_splitExports.AddOrUpdate(exportId, new List<SplitExportStateData> { splitExport }, (key, value) =>
{
value.Add(splitExport);
return value;
});
from = (int) (splitExport.To + 1);
}
var firstExport = _splitExports[exportId].First();
await _fiskalyApiProvider.RequestExportAsync(_configuration.TssId, exportRequest, firstExport.ExportId, firstExport.From, firstExport.To);
await _fiskalyApiProvider.SetExportMetadataAsync(_configuration.TssId, firstExport.ExportId, firstExport.From, firstExport.To);
firstExport.ExportStateData.State = ExportState.Running;
SetExportState(exportId, ExportState.Running);

CacheSplitExportAsync(firstExport, exportRequest, 0).ExecuteInBackgroundThread();
}


private (long? from, long to) GetExportRange(TssDto tss)
{
var from = tss.Metadata.ContainsKey(_lastExportedTransactionNumberKey)
Expand Down Expand Up @@ -371,6 +417,44 @@ private async Task CacheExportAsync(Guid exportId, int currentTry = 0)
}
}

private async Task CacheSplitExportAsync(SplitExportStateData splitExportStateData, ExportTransactions exportRequest, int currentTry = 0)
{
SplitExportStateData nextSplitExport = null;
try
{
await _fiskalyApiProvider.StoreDownloadSplitResultAsync(_configuration.TssId, splitExportStateData);
var export = _splitExports.FirstOrDefault(x => x.Key== splitExportStateData.ParentExportId);
if (export.Value != null)
{
nextSplitExport = export.Value.Where(x => x.ExportStateData.State == ExportState.Unkown).FirstOrDefault();
if (nextSplitExport != null)
{
await _fiskalyApiProvider.RequestExportAsync(_configuration.TssId, exportRequest, nextSplitExport.ExportId, nextSplitExport.From, nextSplitExport.To);
await _fiskalyApiProvider.SetExportMetadataAsync(_configuration.TssId, nextSplitExport.ExportId, nextSplitExport.From, nextSplitExport.To);
nextSplitExport.ExportStateData.State = ExportState.Running;

await CacheSplitExportAsync(nextSplitExport, exportRequest, 0);
}
}
}
catch (WebException)
{
if (_configuration.RetriesOnTarExportWebException > currentTry)
{
currentTry++;
_logger.LogWarning($"WebException on Export from Fiskaly retry {currentTry} from {_configuration.RetriesOnTarExportWebException}, DelayOnRetriesInMs: {_configuration.DelayOnRetriesInMs}.");
await Task.Delay(_configuration.DelayOnRetriesInMs * (currentTry + 1)).ConfigureAwait(false);
await CacheSplitExportAsync(splitExportStateData, exportRequest, currentTry);
}
}
catch (Exception ex)
{

_logger.LogError(ex, "Failed to execute {Operation} - ExportId: {ExportId}", nameof(CacheExportAsync), nextSplitExport.ExportId);
SetExportState(splitExportStateData.ParentExportId, ExportState.Failed, ex);
}
}

private void SetExportState(Guid tokenId, ExportState exportState, Exception error = null)
{
_readStreamPointer.AddOrUpdate(tokenId.ToString(), new ExportStateData
Expand Down Expand Up @@ -471,26 +555,51 @@ public async Task<ExportDataResponse> ExportDataAsync(ExportDataRequest request)
throw new FiskalyException("The export failed to start. It needs to be retriggered");
}
var tempFileName = request.TokenId;
var exportStateInformation = await _fiskalyApiProvider.GetExportStateInformationByIdAsync(_configuration.TssId, Guid.Parse(request.TokenId));
if (exportStateInformation.State == "ERROR")
var exportId = Guid.Parse(request.TokenId);
if (_splitExports.ContainsKey(exportId))
{
throw new FiskalyException($"The export failed with a fiskaly internal error: {exportStateInformation.Exception}");
_readStreamPointer.TryGetValue(request.TokenId, out var exportState);
if (exportState.State != ExportState.Succeeded && exportState.State != ExportState.Failed)
{
var export = _splitExports.FirstOrDefault(x => x.Key == exportId);
if (export.Value != null)
{
if (export.Value.All(x => x.ExportStateData.State == ExportState.Succeeded))
{
SetExportState(export.Key, ExportState.Succeeded);
}
else if (export.Value.Any(x => x.ExportStateData.State == ExportState.Failed))
{
SetExportState(export.Key, ExportState.Failed);
}
}
}
}
if (_readStreamPointer.TryGetValue(request.TokenId, out var exportStateData) && exportStateData.State == ExportState.Failed)
else
{
throw exportStateData.Error;
}

if (exportStateInformation.State != "COMPLETED" || !File.Exists(tempFileName))
{
return new ExportDataResponse
var exportStateInformation = await _fiskalyApiProvider.GetExportStateInformationByIdAsync(_configuration.TssId, Guid.Parse(request.TokenId));
if (exportStateInformation.State == "ERROR")
{
TokenId = request.TokenId,
TotalTarFileSize = -1,
TarFileEndOfFile = false,
TotalTarFileSizeAvailable = false
};
throw new FiskalyException($"The export failed with a fiskaly internal error: {exportStateInformation.Exception}");
}
if (_readStreamPointer.TryGetValue(request.TokenId, out var exportStateDat) && exportStateDat.State == ExportState.Failed)
{
throw exportStateDat.Error;
}

if (exportStateInformation.State != "COMPLETED" || !File.Exists(tempFileName))
{
return new ExportDataResponse
{
TokenId = request.TokenId,
TotalTarFileSize = -1,
TarFileEndOfFile = false,
TotalTarFileSizeAvailable = false
};
}
}
_readStreamPointer.TryGetValue(request.TokenId, out var exportStateData);
if (exportStateData.State != ExportState.Succeeded || !File.Exists(tempFileName))
{
return new ExportDataResponse
Expand Down Expand Up @@ -570,11 +679,21 @@ public async Task<EndExportSessionResponse> EndExportSessionAsync(EndExportSessi
}
else
{
var metadata = await _fiskalyApiProvider.GetExportMetadataAsync(_configuration.TssId, Guid.Parse(request.TokenId));
Dictionary<string, object> metadata;
if (_splitExports.ContainsKey(Guid.Parse(request.TokenId)))
{
var export = _splitExports.FirstOrDefault(x => x.Key == Guid.Parse(request.TokenId));
var lastExport = export.Value.Last();
metadata = await _fiskalyApiProvider.GetExportMetadataAsync(_configuration.TssId, lastExport.ExportId);
}
else
{
metadata = await _fiskalyApiProvider.GetExportMetadataAsync(_configuration.TssId, Guid.Parse(request.TokenId));
}
if (metadata.ContainsKey("end_transaction_number"))
{
await SetLastExportedTransactionNumber(Convert.ToInt64(metadata["end_transaction_number"], CultureInfo.InvariantCulture));
sessionResponse.IsErased = true;
}
}
Expand All @@ -593,6 +712,8 @@ public async Task<EndExportSessionResponse> EndExportSessionAsync(EndExportSessi
}
finally
{

_splitExports.TryRemove(Guid.Parse(request.TokenId), out _);
try
{
if (File.Exists(tempFileName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public class FiskalySCUConfiguration
public int RetriesOn5xxError { get; set; } = 2;
public int RetriesOnTarExportWebException{ get; set; } = 2;
public int DelayOnRetriesInMs { get; set; } = 1000;
public long MaxExportTransaction { get; set; } = 320000;// 800000 / 2.5
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;

namespace fiskaltrust.Middleware.SCU.DE.FiskalyCertified.Models
{
public class SplitExportStateData
{
public Guid ParentExportId { get; set; }
public Guid ExportId { get; set; }
public ExportStateData ExportStateData { get; set; }
public long From { get; set; }
public long To { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
using fiskaltrust.Middleware.SCU.DE.FiskalyCertified.Models;
using Newtonsoft.Json;
using fiskaltrust.Middleware.SCU.DE.FiskalyCertified.Helpers;
using Microsoft.Extensions.Logging;
using fiskaltrust.Middleware.SCU.DE.Helpers.TLVLogParser.Tar;

namespace fiskaltrust.Middleware.SCU.DE.FiskalyCertified.Services
{
Expand All @@ -21,6 +21,7 @@ public sealed class FiskalyV2ApiProvider : IFiskalyApiProvider, IDisposable
private readonly HttpClientWrapper _httpClient;
private readonly JsonSerializerSettings _serializerSettings;


public FiskalyV2ApiProvider(FiskalySCUConfiguration configuration, HttpClientWrapper httpClientWrapper)
{
_configuration = configuration;
Expand Down Expand Up @@ -95,11 +96,11 @@ public async Task<ExportStateInformationDto> GetExportStateInformationByIdAsync(
{
return JsonConvert.DeserializeObject<ExportStateInformationDto>(responseContent);
}

throw new FiskalyException($"Communication error ({response.StatusCode}) while getting export state information (GET tss/{tssId}/export/{exportId}). Response: {responseContent}",
(int) response.StatusCode, $"GET tss/{tssId}/export/{exportId}");
(int) response.StatusCode, $"GET tss/{tssId}/export/{exportId}");
}


public async Task<Dictionary<string, object>> GetExportMetadataAsync(Guid tssId, Guid exportId)
{
var response = await _httpClient.GetAsync($"tss/{tssId}/export/{exportId}/metadata");
Expand All @@ -111,6 +112,7 @@ public async Task<ExportStateInformationDto> GetExportStateInformationByIdAsync(

throw new FiskalyException($"Communication error ({response.StatusCode}) while getting export metadata (GET tss/{tssId}/export/{exportId}/metadata). Response: {responseContent}",
(int) response.StatusCode, $"GET tss/{tssId}/export/{exportId}/metadata");

}

public async Task RequestExportAsync(Guid tssId, ExportTransactions exportRequest, Guid exportId, long? fromTransactionNumber, long toTransactionNumber)
Expand Down Expand Up @@ -189,8 +191,26 @@ public async Task RequestExportAsync(Guid tssId, ExportTransactionsWithDatesDto
public async Task StoreDownloadResultAsync(Guid tssId, Guid exportId)
{
var exportStateInformation = await WaitUntilExportFinished(tssId, exportId);
var contentStream = await GetExportByExportStateAsync(exportStateInformation);

using var fileStream = File.Create(exportId.ToString());
contentStream.CopyTo(fileStream);
}

public async Task StoreDownloadSplitResultAsync(Guid tssId, SplitExportStateData splitExportStateData)
{
var exportStateInformation = await WaitUntilExportFinished(tssId, splitExportStateData.ExportId);
var result = await GetExportByExportStateAsync(exportStateInformation);
File.WriteAllBytes(exportId.ToString(), result);

if (exportStateInformation.State == "COMPLETED")
{
TarFileHelper.AppendTarStreamToTarFile(splitExportStateData.ParentExportId.ToString(), result);
splitExportStateData.ExportStateData.State = ExportState.Succeeded;
}
else
{
splitExportStateData.ExportStateData.State = ExportState.Failed ;
}
}

private async Task<ExportStateInformationDto> WaitUntilExportFinished(Guid tssId, Guid exportId)
Expand Down Expand Up @@ -230,12 +250,12 @@ public async Task<TssDto> GetTseByIdAsync(Guid tssId)
(int) response.StatusCode, $"GET tss/{tssId}");
}

public async Task<byte[]> GetExportByExportStateAsync(ExportStateInformationDto exportStateInformation)
public async Task<Stream> GetExportByExportStateAsync(ExportStateInformationDto exportStateInformation)
{
var response = await _httpClient.GetAsync($"tss/{exportStateInformation.TssId}/export/{exportStateInformation.Id}/file");
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsByteArrayAsync();
return await response.Content.ReadAsStreamAsync();
}

throw new FiskalyException($"Communication error ({response.StatusCode}) while downloading TAR export (GET tss/{exportStateInformation.TssId}/export/{exportStateInformation.Id}/file).",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using fiskaltrust.Middleware.SCU.DE.FiskalyCertified.Models;

Expand All @@ -9,7 +11,7 @@ public interface IFiskalyApiProvider
{
Task CreateClientAsync(Guid tssId, string serialNumber, Guid clientId);
Task<List<ClientDto>> GetClientsAsync(Guid tssId);
Task<byte[]> GetExportByExportStateAsync(ExportStateInformationDto exportStateInformation);
Task<Stream> GetExportByExportStateAsync(ExportStateInformationDto exportStateInformation);
Task<Dictionary<string, object>> GetExportMetadataAsync(Guid tssId, Guid exportId);
Task<ExportStateInformationDto> GetExportStateInformationByIdAsync(Guid tssId, Guid exportId);
Task<IEnumerable<TransactionDto>> GetStartedTransactionsAsync(Guid tssId);
Expand All @@ -24,6 +26,7 @@ public interface IFiskalyApiProvider
Task RequestExportAsync(Guid tssId, ExportTransactionsWithDatesDto exportRequest, Guid exportId);
Task SetExportMetadataAsync(Guid tssId, Guid exportId, long? fromTransactionNumber, long toTransactionNumber);
Task StoreDownloadResultAsync(Guid tssId, Guid exportId);
Task StoreDownloadSplitResultAsync(Guid tssId, SplitExportStateData splitExportStateData);
Task PatchTseMetadataAsync(Guid tssId, Dictionary<string, object> metadata);
Task DisableClientAsync(Guid tssId, string serialNumber, Guid clientId);
}
Expand Down

0 comments on commit 6f31649

Please sign in to comment.