Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling multiline text column #422

Open
mkshanid opened this issue Mar 8, 2023 · 10 comments
Open

Handling multiline text column #422

mkshanid opened this issue Mar 8, 2023 · 10 comments
Assignees
Labels
bug Something isn't working

Comments

@mkshanid
Copy link

mkshanid commented Mar 8, 2023

How can we handle if the pipe sepeated csv is having a text column with multiline text. Text delimitter is ".

With the given example in the document, it is throwing error "could not deserialize value" because of the new line in the text column

@paillave
Copy link
Owner

paillave commented Mar 9, 2023

Unfortunately the header cannot be parsed in case of it is on many lines.

If you know for sure how many lines you header is, and if the position of each column is the same, you can mention the column index instead of the column name. Then you can ask to ignore the X first lines of your csv file.

The mapping would look like the following:

stream.CrossApplyTextFile("parse file", o => o
  .UseMap(m => new 
    {
      Value1 = m.ToColumn(0),
      Value2 = m.ToDateColumn(1, "ddMMyyyy"),
      Value3 = m.ToNumberColumn<decimal?>(2, ".")
    })
  // Skip the parsing of the header. The global parsing won't need it as no column relies on its name
  .IgnoreFirstLines(2) 
);

Let me know if it works (it should!).
If unclear, send me an example of input file; I'll provide you a way to handle it.

@paillave paillave self-assigned this Mar 9, 2023
@paillave paillave added help wanted Extra attention is needed question Further information is requested documentation Documentation may be added or completed on the portal labels Mar 9, 2023
@mkshanid
Copy link
Author

Sorry for the confusion.

Multiple line is not on the header but on the column values. for example, my csv looks like this.

Code|Name|Description
1000|Peter|"10 year experienced with
high level knowledge in ..."

here the 'Description' column value is multiline. Also, We cannot predict how many lines each column value will take.

@paillave paillave added bug Something isn't working and removed help wanted Extra attention is needed question Further information is requested documentation Documentation may be added or completed on the portal labels Mar 13, 2023
@paillave
Copy link
Owner

paillave commented Mar 13, 2023

Sorry for the confusion. At the moment, native support of multiline values doesn't exist. As a matter of a fact, the parsing algorithm need to be drastically changed.
I classified it as a bug, but I may not work on it very quickly.
In the meantime, if you find on the net a parser for such csv (https://joshclose.github.io/CsvHelper/ handles what you need), you can call it within a CrossApply operator. The way to get the stream from IFileValue is GetStream. Then, if you want to issue several values from one value follow what is mentionned on this page: https://paillave.github.io/Etl.Net/docs/recipes/getManyData

@paillave
Copy link
Owner

paillave commented Mar 13, 2023

Note for myself: maybe use this class https://github.com/JoshClose/CsvHelper/blob/master/src/CsvHelper/CsvParser.cs as a replacement for the csv parsing.
Consider as well this library: https://github.com/nreco/csv

@mkshanid
Copy link
Author

mkshanid commented Apr 7, 2023

Hi,

Thanks for the suggestion.
I have added the CsvHelper parser and using an extension method and it worked for me.

I have tested the code with a large size table (6 column - numeric & guid) having 4.3 million records. I have deployed the code on a linux server. It took around 13 hours to complete the loading from csv (under a zip folder) to sql database.

Is this the expected time for such large table or I can improve the performance by optimising?

My code looks like below :

contextStream
  .CrossApplyFolderFiles("list all required files", (p) => folderPath, $"PRF.zip")
  .CrossApplyZipFiles("extract files from zip", $"*.csv")
  .CrossApplyWithCsvHelper<PRF_ENTITY>("parse file", ",")
  .SqlServerSave("save in DB", o => o
      .ToTable($"PRF")
  )
  .Do("display ids on console", (i) =>
  {
      Console.WriteLine($"PRF {i.product_guid}");
  });

where CrossApplyWithCsvHelper is the new extension method I have written with CsvHelper.

public static IStream<TOut> CrossApplyWithCsvHelper<TOut>(this IStream<IFileValue> stream, string name,string delimitter = "|", bool noParallelisation = false)
        {
            return stream.CrossApply<IFileValue, TOut>("parse file", (fileValue, dependencyResolver, cancellationToken, push) =>
            {
                var config = new CsvConfiguration(CultureInfo.InvariantCulture)
                {
                    Mode = CsvMode.RFC4180,
                    Delimiter = delimitter,
                    LineBreakInQuotedFieldIsBadData = false,
                };

                config.BadDataFound = null;
                config.MissingFieldFound = null;

                using (var reader = new StringReader(new StreamReader(fileValue.GetContent()).ReadToEnd()))
                using (var csv = new CsvReader(reader, config))
                {
                    var records = csv.GetRecords<TOut>();
                    foreach (var firm in records)
                    {
                        if (cancellationToken.IsCancellationRequested)
                            return;
                        push(firm);
                    }

                }
            });
        }

@paillave
Copy link
Owner

paillave commented Apr 7, 2023

It is definitely not the expected time. Millions rows to push directly on sql server is a matter of minutes. I will have a closer look to your code and come back to you.

@paillave
Copy link
Owner

paillave commented Apr 7, 2023

Improvments I can think of:

Submit rows as they are parsed

Here, you are loading ands parsing the entire file in memory before submitting its rows it to the output stream. You should submit rows one by one as they are parsed:

public static IStream<TOut> CrossApplyWithCsvHelper<TOut>(this IStream<IFileValue> stream, string name,string delimitter = "|", bool noParallelisation = false)
{
    return stream.CrossApply<IFileValue, TOut>("parse file", (fileValue, dependencyResolver, cancellationToken, push) =>
    {
        var config = new CsvConfiguration(CultureInfo.InvariantCulture)
        {
            Mode = CsvMode.RFC4180,
            Delimiter = delimitter,
            LineBreakInQuotedFieldIsBadData = false,
        };

        config.BadDataFound = null;
        config.MissingFieldFound = null;

        using (var reader = new StringReader(new StreamReader(fileValue.GetContent()).ReadToEnd()))
        using (var csv = new CsvReader(reader, config))
        {
            csv.Read();
            csv.ReadHeader();
            while (csv.Read() && !cancellationToken.IsCancellationRequested)
            {
                push(csv.GetRecord<TOut>());
            }
        }
    });
}

Use bulkload

At the moment SqlServer extensions for dotnet don't leverage the power of bulk load. Therefore, (still at the moment) rows are saved one by one using a merge statement for proceed with an upsert.
You may want to use EfCore extensions for dotnet as by default the save operator uses bulkload capabilities. This drastically accelerates the saving time.

Indexes

13h to save couple of millions rows is absolutely not acceptable. It as also possible that your existing table your are inserting in doesn't have the proper indexes and that the database has to make an index scan or even a table scan to try to find the row that may need to be updated. The index you need to create is the key that is used to find the corresponding row. If none is mentionned, the primary key(s) will be used

Use insert only

Use insert only if you don't need to make an upsert. This will blindly make an insert into instead of a merge that needs to seek the potential existing row instead.
You will find the option to proceed with an insert only in the save operator.

@mkshanid
Copy link
Author

Thanks for the suggestion.

Can you give me an example for insert only option? I couldn't find an option in the save operator.

@paillave
Copy link
Owner

Oups, sorry, indeed, at the moment, this option only exists in EFCore extensions. Did you give a try using EFCore extensions? Because 13h to upsert million rows seems REALLY a lot!
At some point (I can't say when yet), SqlServer extensions and EFCore extensions will share the same foundations to proceed with a bulkload upsert. For now, this capability of very high upsert performance is done for EfCore extensins only.

@mkshanid
Copy link
Author

mkshanid commented Apr 26, 2023

No. Moving to EFCore was little difficult for me since I need to do a code rewrite on different places. I tried to make the bulk insert with the current implementation. I have added an extension method for SqlBulkInsert and the performance got improved a lot. But I have another challenge on the memory usage. Memory usage is almost 100% for a 40GB ram size. I doubt the memory is not getting removed even after the one table load is completed and it keeps appending for each table until all the tables are loaded completely. my code looks like below.

 var grp4_process1 = contextStream
           .Do("start", p => logger.LogInformation($"started Table1"))
           .CrossApplyFolderFiles("list all required files", (p) => folderPath, fileFormat)
           .CrossApplyZipFiles("extract files from zip", "Table1.txt")
           .CrossApplyWithCsvHelper<Table1>("parse file inside the zip")
            .ToList("get list data for individual file")
            .WaitWhenDone("wait untill all file grp4_processed")
            .SqlBulkInsert("bukl load the combined list", Table1, i => new { FirmId = i["FirmId"], FirmGuid = i["FirmGuid"] });

            // Table2
            logger.LogInformation($"Proccessing for the file Table2");

            var grp4_process2 = contextStream
           .WaitWhenDone("stating proccess 2", grp4_process1)
           .Do("start", p => logger.LogInformation($"started Table2"))
           .CrossApplyFolderFiles("list all required files", (p) => folderPath, fileFormat)
           .CrossApplyZipFiles("extract files from zip", "Table2.txt")
           .CrossApplyWithCsvHelper<Table2>("parse file inside the zip")
            .ToList("get list data for individual file")
            .WaitWhenDone("wait untill all file grp4_processed")
            .SqlBulkInsert("bukl load the combined list", Table2, i => new { FirmID = i["FirmID"], FirmGuid = i["FirmGuid"] });
			
			.
			.
			.
			continues...
			

and the SqlBulkInsert function looks like this

public static IStream<List<TIn>> SqlBulkInsert<TIn>(this IStream<List<TIn>> stream, string name, string _tableName, Func<DataRow, dynamic> keys = null)
        {
            return stream.Load(name,(list , resolver) =>
            {
                var con = resolver.Resolve<SqlConnection>();
                var _logger = resolver.Resolve<ILogger>();
                using (var copy = new SqlBulkCopy(con))
                {
                    copy.BulkCopyTimeout = 0;
                    copy.DestinationTableName = _tableName;
                    try
                    {
                        using (var table = list.ToDataTable(keys))
                        {
                            copy.WriteToServer(table);
                        }
                        _logger.LogInformation($"SqlBulkInsert completed for {_tableName}");
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError("Error in bulk save for the table: " + _tableName);
                        _logger.LogError("Error: " + ex.ToString());
                        throw ex;
                    }

                }
            });
        }

        public static IStream<TIn> Load<TIn>(this IStream<TIn> stream, string name, Action<TIn,IDependencyResolver> processRow)
        {
            return new DoStreamNode<TIn, IStream<TIn>>(name, new DoArgs<TIn, IStream<TIn>>
            {
                Processor = new BulkLoadProcessor<TIn, TIn>((TIn i) => i, processRow),
                Stream = stream
            }).Output;
        }

where BulkLoadProcessor is an inheritance of IDoProcessor


public class BulkLoadProcessor<TIn, TInnerIn> : IDoProcessor<TIn>
    {
        private readonly Func<TIn, TInnerIn> _getInner;

        private readonly Action<TInnerIn, IDependencyResolver> _processRow;

        public BulkLoadProcessor(Func<TIn, TInnerIn> getInner, Action<TInnerIn, IDependencyResolver> processRow)
        {
            _processRow = processRow;
            _getInner = getInner;
        }

        public void ProcessRow(TIn value, CancellationToken cancellationToken, IDependencyResolver resolver, IInvoker invoker)
        {
            _processRow(_getInner(value),resolver);
        }
    }  

please let me know if you can find any problem that causes the memory leakage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants