Jasinski Technical Wiki

Navigation

Home Page
Index
All Pages

Quick Search
»
Advanced Search »

Contributor Links

Create a new Page
Administration
File Management
Login/Logout
Your Profile

Other Wiki Sections

Software

PoweredBy

Page History: Fast Bulk Data Load - SQL Server

Compare Page Revisions



« Older Revision - Back to Page History - Newer Revision »


Page Revision: Thu, Feb 08, 2024, 8:36 AM


Reusable Base Class: BulkDataCopierBase

public abstract class BulkDataCopierBase
{
    private string _sourceOdbcConn;

    private string _targetAdoConn;

    //private readonly NLog.Logger _nlog;

    public BulkDataCopierBase(
        string sourceOdbcConn,
        string targetAdoConn
        )
    {
        _sourceOdbcConn = sourceOdbcConn;
        _targetAdoConn = targetAdoConn;
        //_nlog = NLogBuilder.ConfigureNLog("nlog.config").GetCurrentClassLogger();
    }

    public async Task<long> ExecuteAsync()
    {
        RowCount = await GetRowCount();

        var segments = new RowNumberRange(RowCount)
            .BreakIntoSegments(GetRowsPerSegment());

        var opt = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = GetMaxDegreesOfParallelism() 
        };

        var targetTable = GetTargetTable();

        await Parallel.ForEachAsync(segments, opt, async (segment, ct) =>
        {
            var sourceSql = GetSql(segment);
            await CopySegmentAsync(sourceSql, targetTable);
        });

        return RowCount;
    }

    public long RowCount { get; set; }

    /// <summary>
    /// This is expected to return the SQL statement to retrieve data for copying,
    /// based on the row number range. The order of columns is expected to match
    /// the order of columns for the table returned by the
    /// <seealso cref="GetTargetTable()"/> method.
    /// </summary>
    /// <param name="range"></param>
    /// <returns></returns>
    protected abstract string GetSql(RowNumberRange range);

    /// <summary>
    /// This is expected to return a SQL statement that returns a single 
    /// row with a single field that will indicate the total number of
    /// records that will be copied.
    /// </summary>
    /// <returns></returns>
    protected abstract string GetRowCountSql();

    protected abstract long GetRowsPerSegment();
    protected abstract int GetMaxDegreesOfParallelism();

    /// <summary>
    /// This is expected to return the table in the target database to copy data into. 
    /// The order of columns is expected to match the order of columns returned by the
    /// <seealso cref="GetSql(RowNumberRange)"/> method.
    /// </summary>
    /// <returns></returns>
    protected abstract string GetTargetTable();

    /// <summary>
    /// Does a SqlBulkCopy of data from the ODBC source to the target SQL Server table.
    /// </summary>
    /// <param name="sourceSql">SQL statement against the ODBC source</param>
    /// <param name="targetTable">SQL Server table name</param>
    private async Task<long> CopySegmentAsync(
        string sourceSql,
        string targetTable
        )
    {
        try
        {
            //_nlog.Info($"Bulk copying {targetTable} started. =====");
            long recordCount = 0;
            var sw = Stopwatch.StartNew();

            using (var srcOdbcConn = new OdbcConnection(_sourceOdbcConn))
            {
                await srcOdbcConn.OpenAsync();

                var selectCmd = new OdbcCommand(sourceSql, srcOdbcConn);

                selectCmd.CommandTimeout = 300; // 300 sec = 5 min

                var reader = await selectCmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess);

                using (var tgtAdoConn = new SqlConnection(_targetAdoConn))
                {
                    await tgtAdoConn.OpenAsync();

                    using (var bc = new SqlBulkCopy(tgtAdoConn))
                    {
                        bc.DestinationTableName = targetTable;
                        bc.BulkCopyTimeout = 300; // 300 sec = 5 min
                        await bc.WriteToServerAsync(reader);
                        recordCount = bc.RowsCopied;
                        bc.Close();
                    }

                    await tgtAdoConn.CloseAsync();
                }

                await reader.CloseAsync();

                await srcOdbcConn.CloseAsync();

                sw.Stop();

                var msg = $@"Loaded {recordCount:#,##0} row(s) into '{targetTable}' in {sw.Elapsed:h\:mm\:ss}";
                Debug.Print(msg);
                //_nlog.Info(msg);
            }

            //_nlog.Info($"Bulk copying {targetTable} finished. -----");

            return recordCount;
        }
        catch (Exception ex)
        {
            Debug.Print(ex.Message);
            Debug.Assert(false);
            //_nlog.Info($"Error while bulk copying {targetTable}. {ex.Message}");
            throw;
        }
    }

    private async Task<long> GetRowCount()
    {
        long result = -1;

        if (Config.Current == null)
        {
            return result;
        }

        using (var conn = new OdbcConnection(Config.Current.SourceOdbcConnection))
        {
            await conn.OpenAsync();

            var sql = GetRowCountSql();

            using (var cmd = new OdbcCommand(sql, conn))
            {
                var reader = await cmd.ExecuteReaderAsync();

                if (reader.Read())
                {
                    result = reader.GetInt64(0);
                }
            }
        }

        return result;
    }
}

RowNumberRange Class

[DebuggerDisplay("{DisplayText}")]
public class RowNumberRange
{
    public RowNumberRange()
    { }


    public RowNumberRange(long rowCount)
    {
        FirstRow = 1;
        LastRow = rowCount;
    }

    public long FirstRow { get; set; }
    public long LastRow { get; set; }
    public string DisplayText
    {
        get
        {
            return $"Row {FirstRow:#,##0} to {LastRow:#,##0}";
        }
    }

    public string SqlCondition
    {
        get
        {
            return $"RowNum between {FirstRow} and {LastRow}";
        }
    }

    public List<RowNumberRange> BreakIntoSegments(
        long rowsPerSegment
        )
    {
        var result = new List<RowNumberRange>();
        var row1 = FirstRow;
        long row2;

        do
        {
            row2 = row1 + rowsPerSegment - 1;

            if (row2 > LastRow)
            {
                row2 = LastRow;
            }

            result.Add(new RowNumberRange { FirstRow = row1, LastRow = row2 });

            row1 = row2 + 1;

        } while (row2 < LastRow);

        return result;
    }
        
}

ScrewTurn Wiki version 3.0.1.400. Some of the icons created by FamFamFam. Except where noted, all contents Copyright © 1999-2024, Patrick Jasinski.