Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warehousing - copy data from one database to another #399

Open
szum7 opened this issue Dec 20, 2022 · 15 comments
Open

Warehousing - copy data from one database to another #399

szum7 opened this issue Dec 20, 2022 · 15 comments
Assignees
Labels
documentation Documentation may be added or completed on the portal help wanted Extra attention is needed

Comments

@szum7
Copy link

szum7 commented Dec 20, 2022

Hi, can you read data from a source db and copy it to another, target db? I've read and tried the examples in the documentation, but read and save always happen inside the same database even though the documentation mentions 2 args parameters.

Please provide example code if it's possible.

@paillave
Copy link
Owner

You can do it injecting a keyed EF Core context in the dependency injection. Every EF Core operator supports an extra parameter to transmit the key of the connection/db context.
I will provide an example later on.

@paillave
Copy link
Owner

provide you source cade that has an issue for me to help you at best

@szum7
Copy link
Author

szum7 commented Dec 20, 2022

Here's my test project.
EF context has a connection string with the DataWarehouseSource1 database.
processRunner.ExecuteAsync gets the other connection string to DataWarehouseTarget1.
However, this reads and writes to the same db, not from source to target.

Program.cs

using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using DataWarehouseSyncV1.Context;
using Microsoft.EntityFrameworkCore;
using Paillave.Etl.EntityFrameworkCore;

namespace DataWarehouseSyncV1
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var target = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseTarget1;Integrated Security=True;MultipleActiveResultSets=True;";

            var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
            using (var dbCtx = new WarehouseContext())
            {
                var executionOptions = new ExecutionOptions<string>
                {
                    Resolver = new SimpleDependencyResolver().Register<DbContext>(dbCtx),
                };
                var res = await processRunner.ExecuteAsync(target, executionOptions);
            }
        }

        private static void DefineProcess(ISingleStream<string> contextStream)
        {
            contextStream
                .EfCoreSelectSingle("get data", (o, row) => o
                    .Set<SimpleTable>())
                .EfCoreSave("save data")
                .Do("show value on console", i => Console.WriteLine(i.FirstName));
        }
    }
}

WarehouseContext.cs

using Microsoft.EntityFrameworkCore;

namespace DataWarehouseSyncV1.Context
{
    class WarehouseContext : DbContext
    {
        public DbSet<SimpleTable> SimpleTables { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(@"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseSource1;Integrated Security=True;MultipleActiveResultSets=True;");
        }
    }
}

SimpleTable.cs

using System.ComponentModel.DataAnnotations;

namespace DataWarehouseSyncV1.Context
{
    public class SimpleTable
    {
        [Key]
        public int Id { get; set; }
        public string? FirstName { get; set; }
        public string? LastName { get; set; }
        public int? Age { get; set; }
    }
}

@paillave
Copy link
Owner

paillave commented Dec 20, 2022

like I mentionned in my first answer, you must inject your DbContexts by keying them:
example of injection of DbContexts

            var processRunner = StreamProcessRunner.Create<string[]>(TestImport2.Import);
            var executionOptions = new ExecutionOptions<string[]>
            {
                Resolver = new SimpleDependencyResolver()
                    .Register(new DataAccess.TestDbContext1(), "CNX1")
                    .Register(new DataAccess.TestDbContext2(), "CNX2")
            };
            var res = await processRunner.ExecuteAsync(args, executionOptions);

then, in your stream, you must refer to the right keyed one using WithKeyedConnection like on the following example:

                .EfCoreSave("Save composition", o => o
                    .WithKeyedConnection("CNX1")
                    .Entity(i => i.Composition)
                    .SeekOn(i => new { i.Date, i.PortfolioId })
                    .DoNotUpdateIfExists()
                    .Output((i, e) => new
                    {
                        i.Portfolio,
                        Composition = e
                    }));

@paillave
Copy link
Owner

paillave commented Dec 20, 2022

If you cannot have your code sample working with my precisions, I will find time to rehandle your code working asap.

@paillave paillave self-assigned this Dec 20, 2022
@paillave paillave added documentation Documentation may be added or completed on the portal help wanted Extra attention is needed labels Dec 20, 2022
@szum7
Copy link
Author

szum7 commented Jan 2, 2023

Based on your answer I've managed to make the "read from Native SQL db1 and write to Native SQL db2" solution work. But I'm still unable to do it with Entity Framework.
Tried

  • read from EF Core db1 and write to Native SQL db2
  • read from EF Core db1 and write to EF Core db2

But none of these two worked for me. Could you please help and give me a working example? Easiest to understand would be if you reworked my very minimalistic solution from 12/20/2022.

For reference, here's the working "read from Native SQL db1 and write to Native SQL db2" solution:

using Paillave.Etl.Core;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using Paillave.Etl.Reactive.Operators;
using DataWarehouseSyncV1.Context;

namespace SolutionNativeSqlNamespace
{
    class SolutionNativeSql
    {
        public static async Task Run()
        {
            var source = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseSource1;Integrated Security=True;MultipleActiveResultSets=True;";
            var target = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseTarget1;Integrated Security=True;MultipleActiveResultSets=True;";

            var processRunner = StreamProcessRunner.Create<string>(DefineProcessSimpleTable);

            using (var sourceConnection = new SqlConnection(source))
            using (var targetConnection = new SqlConnection(target))
            {
                sourceConnection.Open();
                targetConnection.Open();
                var executionOptions = new ExecutionOptions<string> 
                { 
                    Resolver = new SimpleDependencyResolver()
                        .Register(sourceConnection, "SourceDbKey") 
                        .Register(targetConnection, "TargetDbKey") 
                };

                var res = await processRunner.ExecuteAsync(target, executionOptions);
                Console.Write(res.Failed ? "Failed" : "Succeeded");
                if (res.Failed)
                    Console.Write($"{res.ErrorTraceEvent.NodeName}({res.ErrorTraceEvent.NodeTypeName}):{res.ErrorTraceEvent.Content.Message}");
            }
        }

        private static void DefineProcessSimpleTable(ISingleStream<string> contextStream)
        {
            contextStream
                .CrossApplySqlServerQuery("Get data", o => o
                    .WithKeyedConnection("SourceDbKey")
                    .FromQuery("SELECT * FROM dbo.Persons")
                    .WithMapping<Person>())
                    .SqlServerSave("SqlServerSave", o => o
                        .WithConnection("TargetDbKey")
                        .ToTable("dbo.Persons")
                        .DoNotSave(p => p.Id))
                        .Do("Show values on console", i => Console.WriteLine($"{i.FirstName} ({i.LastName})"));
        }
    }
}

(Sorry for replying so late, I was unavailable due to the holidays.)

@paillave
Copy link
Owner

paillave commented Jan 2, 2023

For entity framework read/write to work, you must inject the related DbContext(s)

@szum7
Copy link
Author

szum7 commented Jan 3, 2023

Yes, I did that already, I've injected the related DbContexts. Here's my code for reference:

using Microsoft.EntityFrameworkCore;
using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using Paillave.Etl.EntityFrameworkCore;

namespace SolutionEFToEFNamespace
{
    class Constants
    {
        public static readonly string SOURCE_CONNECTIONSTRING = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseSource1;Integrated Security=True;MultipleActiveResultSets=True;";
        public static readonly string TARGET_CONNECTIONSTRING = @"Data Source=(localdb)\mssqllocaldb;Initial Catalog=DataWarehouseTarget1;Integrated Security=True;MultipleActiveResultSets=True;";
    }

    public class Person
    {
        public int Id { get; set; }
        public string? FirstName { get; set; }
        public string? LastName { get; set; }
        public int? Age { get; set; }
        public override string ToString() => $"{Id}, {FirstName}, {LastName}, {Age}";
    }

    class DbContextSource : DbContext
    {
        public DbSet<Person> Persons { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(Constants.SOURCE_CONNECTIONSTRING);
        }
    }

    class DbContextTarget : DbContext
    {
        public DbSet<Person> Persons { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer(Constants.TARGET_CONNECTIONSTRING);
        }
    }

    class SolutionEFToEF
    {
        public static async Task Run()
        {
            var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
            using (var contextSource = new DbContextSource())
            using (var contextTarget = new DbContextTarget())
            {
                var executionOptions = new ExecutionOptions<string>
                {
                    Resolver = new SimpleDependencyResolver()
                        .Register(contextSource)
                        //.Register(contextSource, "DbCtxSource") // I cannot use this, because then not even the read works
                        .Register(contextTarget, "DbCtxTarget"),
                };
                var res = await processRunner.ExecuteAsync(Constants.TARGET_CONNECTIONSTRING, executionOptions);
            }
        }

        private static void DefineProcess(ISingleStream<string> contextStream)
        {
            contextStream
                .EfCoreSelect("get data", (o, row) => o
                    // I cannot specify a connection here, no .WithKeyedConnection or such method as far as I know
                    .Set<Person>()
                    .Select(x => new Person
                    {
                        Id = x.Id,
                        FirstName = x.FirstName,
                        LastName = x.LastName,
                        Age = x.Age
                    })
                )
                .EfCoreSave("save data", o => o
                    .WithKeyedConnection("DbCtxTarget") // doesn't seem to do anything, doesn't connect to the target db
                    .Entity(i => new Person { FirstName = i.FirstName, LastName = i.LastName, Age = i.Age })
                    //.WithMode(SaveMode.EntityFrameworkCore) // tried this also, didn't work
                )
                .Do("show value on console", i => Console.WriteLine(i.ToString()));
        }
    }
}

Program.cs goes

using SolutionEFToEFNamespace;

namespace DataWarehouseSyncV1
{
    class Program
    {
        static async Task Main(string[] args)
        {
            await SolutionEFToEF.Run();
        }
    }
}

I'd like to read from a source db using EF Core and write to a target db using EF Core.
I'm only able to read from the source db, I still cannot write to the target db. (And also I'm able to write to the source db, which is not my goal.)

@paillave
Copy link
Owner

paillave commented Jan 3, 2023

I see, I still to refactor a little bit efcore extensions for a better consistency with other extensions.
Actually the connectionKey is still direct parameter of EfCoreSelect* methods. I will have to make it in the same way than for everything else and set the current way as obsolete.

https://github.com/paillave/Etl.Net/blob/master/src/Paillave.Etl.EntityFrameworkCore/EntityFrameworkCoreRead.Stream.ex.cs

@szum7
Copy link
Author

szum7 commented Jan 3, 2023

I'm sorry, there was a mistake from my part and the solution in my previous comment (EFCore to EFCore) does work.

The issue was that I forgot to properly attach the target database to the EF Core. Running "Add-Migration" and "Update-Database" on the target context (target database) solved the issue.

I think this Github Issue can be closed as "injecting the dbcontexts" were the correct answer.

If anything, an updated documentation on the library (https://paillave.github.io/Etl.Net/docs/intro) would be much appreciated. I couldn't find anything there which tells me how a source and target database connection comes into play. The closest I've found is the args[0] and args[1] parameters, but I'm still yet to figure out what args[0] does.
https://paillave.github.io/Etl.Net/
var res = await processRunner.ExecuteAsync(args[0], executionOptions);

@paillave
Copy link
Owner

paillave commented Jan 3, 2023

The first parameter of the payload is the single value that is issued by the trigger stream. See the documentation for this: https://paillave.github.io/Etl.Net/docs/recipes/useExternalData#from-trigger-stream

Like most of question issue, I'll leave this issue opened so that I can track what documentatio I still need to do.

@AhmedJoum
Copy link

Hello, I tried to move data between two database but it didn't work, I checked the SQL server profiler and found out it reads the data, creates temp table and make insertion from that temp but no data inserted,

Note, the ID on the insertion table is Auto generated. dose that effect?

 contextStream.EfCoreSelect("Get Transaction Data"
               , (o, row) => o.Set<Transaction>()
                    .Select(t => new TransactionHeader
                    {
                       
                        AccountId = t.AccountId,
                        AuthCode = t.AuthCode,
                        CreatedBy = t.CreatedBy,
                        TransactionId = t.Id,
                        CreationDate = t.CreationDate,
                    })
               , connectionKey: "stg")
                    .EfCoreSave("Save transaction Data",
                        t => t.WithKeyedConnection("dwh")
                        .Entity(t => new TransactionHeader
                        {
                          
                            AccountId = t.AccountId,
                            AuthCode = t.AuthCode,
                            CreatedBy = t.CreatedBy,
                            TransactionId = t.Id,
                            CreationDate = t.CreationDate,
                        })
                    .SeekOn(i => new { i.AuthCode, i.CreationDate })
                    );

@paillave
Copy link
Owner

paillave commented Jan 9, 2023

Did you ensure that the process worked properly by checking the property Failed of the returned result of ExecuteAsync?

Hello, I tried to move data between two database but it didn't work, I checked the SQL server profiler and found out it reads the data, creates temp table and make insertion from that temp but no data inserted,

Did you check how many rows are issued for each operators?
For this you have to look in the property StreamStatisticCounters of the result returned by ExecuteAsync.
More details here: StreamStatisticCounters: https://paillave.github.io/Etl.Net/docs/tutorials/trackAndCheck#check-the-result
Normally, you can get the amount of impacted row by logging the execution plan with SQL Server profiler. That should help you as well to track where your rows are "lost".

Note, the ID on the insertion table is Auto generated. dose that effect?

There is no problem with autogenerated ID, your code should work properly asis.

Let me know.

@AhmedJoum
Copy link

The issue was due forirgn key constrain, I think I need to log ExceuteAsync Results in the future.

@paillave
Copy link
Owner

paillave commented Jan 9, 2023

The issue was due forirgn key constrain, I think I need to log ExceuteAsync Results in the future.

for a start you must ensure that the property Failed of the response is false like described here:
https://paillave.github.io/Etl.Net/docs/tutorials/trackAndCheck#get-the-error-if-it-occurs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Documentation may be added or completed on the portal help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants