Jasinski Technical Wiki

Navigation

Home Page
Index
All Pages

Quick Search
»
Advanced Search »

Contributor Links

Create a new Page
Administration
File Management
Login/Logout
Your Profile

Other Wiki Sections

Software

PoweredBy

Page History: Fast Bulk Data Load - SQL Server

Compare Page Revisions



« Older Revision - Back to Page History - Newer Revision »


Page Revision: Thu, Feb 08, 2024, 8:40 AM


Overview

When copying data from one database to another, the easiest way to code it is the following.

(1) Execute a SELECT statement against the source database. (2) For each row... (2.1) Optionally populate each foreign key field. (2.2) Execute an INSERT statement against the target database. 

This approach does not scale well at all.

This article provides the means for a substantially faster way to load data.

Reusable Base Class: BulkDataCopierBase

public abstract class BulkDataCopierBase
{
    private string _sourceOdbcConn;

    private string _targetAdoConn;

    //private readonly NLog.Logger _nlog;

    public BulkDataCopierBase(
        string sourceOdbcConn,
        string targetAdoConn
        )
    {
        _sourceOdbcConn = sourceOdbcConn;
        _targetAdoConn = targetAdoConn;
        //_nlog = NLogBuilder.ConfigureNLog("nlog.config").GetCurrentClassLogger();
    }

    public async Task<long> ExecuteAsync()
    {
        RowCount = await GetRowCount();

        var segments = new RowNumberRange(RowCount)
            .BreakIntoSegments(GetRowsPerSegment());

        var opt = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = GetMaxDegreesOfParallelism() 
        };

        var targetTable = GetTargetTable();

        await Parallel.ForEachAsync(segments, opt, async (segment, ct) =>
        {
            var sourceSql = GetSql(segment);
            await CopySegmentAsync(sourceSql, targetTable);
        });

        return RowCount;
    }

    public long RowCount { get; set; }

    /// <summary>
    /// This is expected to return the SQL statement to retrieve data for copying,
    /// based on the row number range. The order of columns is expected to match
    /// the order of columns for the table returned by the
    /// <seealso cref="GetTargetTable()"/> method.
    /// </summary>
    /// <param name="range"></param>
    /// <returns></returns>
    protected abstract string GetSql(RowNumberRange range);

    /// <summary>
    /// This is expected to return a SQL statement that returns a single 
    /// row with a single field that will indicate the total number of
    /// records that will be copied.
    /// </summary>
    /// <returns></returns>
    protected abstract string GetRowCountSql();

    protected abstract long GetRowsPerSegment();
    protected abstract int GetMaxDegreesOfParallelism();

    /// <summary>
    /// This is expected to return the table in the target database to copy data into. 
    /// The order of columns is expected to match the order of columns returned by the
    /// <seealso cref="GetSql(RowNumberRange)"/> method.
    /// </summary>
    /// <returns></returns>
    protected abstract string GetTargetTable();

    /// <summary>
    /// Does a SqlBulkCopy of data from the ODBC source to the target SQL Server table.
    /// </summary>
    /// <param name="sourceSql">SQL statement against the ODBC source</param>
    /// <param name="targetTable">SQL Server table name</param>
    private async Task<long> CopySegmentAsync(
        string sourceSql,
        string targetTable
        )
    {
        try
        {
            //_nlog.Info($"Bulk copying {targetTable} started. =====");
            long recordCount = 0;
            var sw = Stopwatch.StartNew();

            using (var srcOdbcConn = new OdbcConnection(_sourceOdbcConn))
            {
                await srcOdbcConn.OpenAsync();

                var selectCmd = new OdbcCommand(sourceSql, srcOdbcConn);

                selectCmd.CommandTimeout = 300; // 300 sec = 5 min

                var reader = await selectCmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess);

                using (var tgtAdoConn = new SqlConnection(_targetAdoConn))
                {
                    await tgtAdoConn.OpenAsync();

                    using (var bc = new SqlBulkCopy(tgtAdoConn))
                    {
                        bc.DestinationTableName = targetTable;
                        bc.BulkCopyTimeout = 300; // 300 sec = 5 min
                        await bc.WriteToServerAsync(reader);
                        recordCount = bc.RowsCopied;
                        bc.Close();
                    }

                    await tgtAdoConn.CloseAsync();
                }

                await reader.CloseAsync();

                await srcOdbcConn.CloseAsync();

                sw.Stop();

                var msg = $@"Loaded {recordCount:#,##0} row(s) into '{targetTable}' in {sw.Elapsed:h\:mm\:ss}";
                Debug.Print(msg);
                //_nlog.Info(msg);
            }

            //_nlog.Info($"Bulk copying {targetTable} finished. -----");

            return recordCount;
        }
        catch (Exception ex)
        {
            Debug.Print(ex.Message);
            Debug.Assert(false);
            //_nlog.Info($"Error while bulk copying {targetTable}. {ex.Message}");
            throw;
        }
    }

    private async Task<long> GetRowCount()
    {
        long result = -1;

        if (Config.Current == null)
        {
            return result;
        }

        using (var conn = new OdbcConnection(Config.Current.SourceOdbcConnection))
        {
            await conn.OpenAsync();

            var sql = GetRowCountSql();

            using (var cmd = new OdbcCommand(sql, conn))
            {
                var reader = await cmd.ExecuteReaderAsync();

                if (reader.Read())
                {
                    result = reader.GetInt64(0);
                }
            }
        }

        return result;
    }
}

RowNumberRange Class

[DebuggerDisplay("{DisplayText}")]
public class RowNumberRange
{
    public RowNumberRange()
    { }


    public RowNumberRange(long rowCount)
    {
        FirstRow = 1;
        LastRow = rowCount;
    }

    public long FirstRow { get; set; }
    public long LastRow { get; set; }
    public string DisplayText
    {
        get
        {
            return $"Row {FirstRow:#,##0} to {LastRow:#,##0}";
        }
    }

    public string SqlCondition
    {
        get
        {
            return $"RowNum between {FirstRow} and {LastRow}";
        }
    }

    public List<RowNumberRange> BreakIntoSegments(
        long rowsPerSegment
        )
    {
        var result = new List<RowNumberRange>();
        var row1 = FirstRow;
        long row2;

        do
        {
            row2 = row1 + rowsPerSegment - 1;

            if (row2 > LastRow)
            {
                row2 = LastRow;
            }

            result.Add(new RowNumberRange { FirstRow = row1, LastRow = row2 });

            row1 = row2 + 1;

        } while (row2 < LastRow);

        return result;
    }
        
}

ScrewTurn Wiki version 3.0.1.400. Some of the icons created by FamFamFam. Except where noted, all contents Copyright © 1999-2024, Patrick Jasinski.