Как прочитать миллионы строк из текстового файла и быстро вставить в таблицу

Я прошел через быстро вставить 2 миллиона строк в SQL Server ссылка и обнаружил, что я могу сделать это с помощью массовой вставки. Итак, я пытаюсь создать таблицу данных (код, как показано ниже), но, поскольку это огромный файл (более 300 КБ строк), я получаю OutOfMemoryEexception в своем коде:

string line;
DataTable data = new DataTable();
string[] columns = null;    
bool isInserted = false;           

using (TextReader tr = new StreamReader(_fileName, Encoding.Default))
{
   if (columns == null)
   {
      line = tr.ReadLine();
      columns = line.Split(',');
   }

   for (int iColCount = 0; iColCount < columns.Count(); iColCount++)
   {
      data.Columns.Add("Column" + iColCount, typeof(string));
   }                       

   string[] columnVal;

   while ((line = tr.ReadLine()) != null)
   {
        columnVal = line.Split(','); // OutOfMemoryException throwing in this line
        data.Rows.Add(columnVal);
    }
}

после долгой работы я изменил свой код, как показано ниже, но также я получаю OutOfMemoryException во время добавления строк в таблицу данных

 DataTable data = new DataTable();
 string[] columns = null;
 var line = string.Empty;
 using (TextReader tr = new StreamReader(_fileName, Encoding.Default))
 {
     if (columns == null)
     {
         line = tr.ReadLine();
         columns = line.Split(',');
     }

     for (int iColCount = 0; iColCount < columns.Count(); iColCount++)
     {
        data.Columns.Add("Column" + iColCount, typeof(string));
     }
  }

  // Split the rows in 20000 rows in different list

  var _fileList = File.ReadLines(_fileName, Encoding.Default).ToList();
  var splitChunks = new List<List<string>>();
  splitChunks = SplitFile(_fileList, 20000);

 Parallel.ForEach(splitChunks, lstChunks =>
 {
   foreach (var rows in lstChunks)
   {
     string[] lineFields = rows.Split(',');
     DataRow row = datatbl.NewRow();
     for (int iCount = 0; iCount < lineFields.Count(); iCount++)
     {
        row[iCount] = lineFields[iCount] == string.Empty ? "" : lineFields[iCount].ToString();
     }
     datatbl.Rows.Add(row);
   }
 }); 

Я могу сделать массовую вставку для следующего уровня, как показано ниже:

SqlConnection SqlConnectionObj = GetSQLConnection();
SqlBulkCopy bulkCopy = new SqlBulkCopy(SqlConnectionObj, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
bulkCopy.DestinationTableName = "TempTable";
bulkCopy.WriteToServer(data);

Файл содержит следующие данные

4714,1370,АУСРИХТЕН МАШИНЕЛЛ

4870,1370,ПЛАТА ШТЕНКА

0153,1900,ПИСТОЛЕТ ДЛЯ ЗАЧАТКИ

0154,1900,НОВЫЙ ТЕРМИНАТОР

0360,1470,МУ 186 МАЧ. Х ЛАВ. S/A АСТЭ PS174

9113-H22,1970,БУРОВЫЕ ДОЛОТА MC

Код должен преобразовать это в 6 строк и 3 столбца.

Есть ли более быстрый способ реализовать вышеуказанные функции для чтения файла и создания таблицы данных для массовой вставки? Так что я не должен получить память из исключения индекса.

Заранее спасибо.


person Rocky    schedule 25.07.2016    source источник
comment
Это нужно делать программно? Если это всего лишь один раз, вы можете использовать SSMS Tools   -  person Mark    schedule 25.07.2016
comment
Не могли бы вы разделить его, может быть? например, чтение нескольких тысяч строк, массовая вставка их, затем повторное использование данных для следующих нескольких тысяч и т. д.   -  person Cee McSharpface    schedule 25.07.2016
comment
Да, так и должно быть, потому что у меня есть еще код для получения файла с разных серверов на основе env. (РАЗРАБОТКА, ТСТ, ПРОД). и имеют еще несколько функций.   -  person Rocky    schedule 25.07.2016
comment
превратите этот метод чтения текстового файла в функцию IEnumerable, которая возвращает строки. В другой функции есть счетчик, и когда счет по модулю, партия равна нулю, очищает построенную таблицу данных с помощью массовой вставки.   -  person Mark    schedule 25.07.2016
comment
@Mark: как я могу добиться функциональности с помощью инструментов SSMS? не могли бы вы дать мне знать? Примечание: в моем сценарии у меня есть только файл .txt, вот и все   -  person Rocky    schedule 25.07.2016
comment
@Rocky, почему бы не реализовать IDataReader и передать его SqlBulkCopy.WriteToServer? Если вы загружаете файл CSV, я могу написать решение   -  person Artavazd Balayan    schedule 25.07.2016
comment
@ArtavazdBalayan: Извините, но я использую только и только файл .txt... Спасибо за ваш комментарий   -  person Rocky    schedule 25.07.2016
comment
@Rocky, не могли бы вы показать пример текстового файла?   -  person Artavazd Balayan    schedule 25.07.2016


Ответы (3)


Причина, по которой вы получаете OutOfMemoryException, заключается в том, что вы создаете таблицу данных в памяти и пытаетесь вставить в нее 300 000 строк.

Много данных нужно поместить в память.

Вместо этого вы должны делать это каждое определенное количество строк, которые вы читаете из текстового файла - вам нужно вставить его в базу данных.

Как вы это сделаете, зависит от вас, вы можете использовать SQL или массовое копирование, но имейте в виду, что вы не можете прочитать весь текстовый файл и сохранить его в памяти, поэтому делайте это по частям.

person gilmishal    schedule 25.07.2016
comment
Кажется, вы все еще загружаете все свои данные в один объект с данными, поправьте меня, если я ошибаюсь, потому что я не могу найти, где вы создаете экземпляр datatbl. Либо создайте новую таблицу данных в каждом вхождении foreach, либо удалите parallel.foreach и очистите данные таблицы после каждой вставки. - person gilmishal; 29.07.2016
comment
Лично я нахожу проще и быстрее использовать SQL-запрос для вставки данных в таблицу - это также, вероятно, быстрее, поскольку вы пропускаете использование объекта с данными, но ваш способ тоже может работать. - person gilmishal; 29.07.2016

Решение с SqlBulkCopy.WriteToServer и IDataReader. Я использую CSV, но надеюсь, что его будет легко модифицировать для других типов. SqlBulkCopy использует только 3 вещи из IDateReader и мы должны их реализовать:

  • public int FieldCount {get; }
  • public bool Read()
  • public object GetValue(int i)

Все остальные свойства и методы могут быть не реализованы. Интересная статья о SqlBulkCopy. Полный код: https://dotnetfiddle.net/giG3Ai. Вот с урезанной версией:

namespace SqlBulkCopy
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Diagnostics;
    using System.Data;
    using System.Data.SqlClient;

    public class CsvReader : IDataReader
    {
        private readonly char CSV_DELIMITER = ',';

        private readonly StreamReader _sr;
        private readonly Dictionary<string, Func<string, object>> _csv2SqlType;
        private readonly string[] _headers;

        private string _line;
        private string[] _values;

        public int FieldCount { get { return _headers.Length; } }

        public CsvReader(string filePath, Dictionary<string, Func<string, object>> csvColumn2SqlTypeDict)
        {
            if (string.IsNullOrEmpty(filePath))
                throw new ArgumentException("is null or empty", "filePath");
            if (!System.IO.File.Exists(filePath))
                throw new IOException(string.Format("{0} doesn't exist or access denied", filePath));
            if (csvColumn2SqlTypeDict == null)
                throw new ArgumentNullException("csvColumn2SqlTypeDict");

            _sr = new StreamReader(filePath);
            _csv2SqlType = csvColumn2SqlTypeDict;
            _headers = ReadHeaders();
            ValidateHeaders();
        }
        public object GetValue(int i)
        {
            // Get column value
            var colValue = _values[i];
            // Get column name
            var colName = _headers[i];
            // Try to convert to SQL type
            try { return _csv2SqlType[colName](colValue); }
            catch { return null; }
        }
        public bool Read()
        {
            if (_sr.EndOfStream) return false;

            _line = _sr.ReadLine();
            _values = _line.Split(CSV_DELIMITER);
            // If row is invalid, go to next row
            if (_values.Length != _headers.Length)
                return Read();
            return true;
        }
        public void Dispose()
        {
            _sr.Dispose();
        }
        private void ValidateHeaders()
        {
            if (_headers.Length != _csv2SqlType.Keys.Count)
                throw new InvalidOperationException(string.Format("Read {0} columns, but csv2SqlTypeDict contains {1} columns", _headers.Length, _csv2SqlType.Keys));
            foreach (var column in _headers)
            {
                if (!_csv2SqlType.ContainsKey(column))
                    throw new InvalidOperationException(string.Format("There is no convertor for column '{0}'", column));
            }
        }
        private string[] ReadHeaders()
        {
            var headerLine = _sr.ReadLine();
            if (string.IsNullOrEmpty(headerLine))
                throw new InvalidDataException("There is no header in CSV!");
            var headers = headerLine.Split(CSV_DELIMITER);
            if (headers.Length == 0)
                throw new InvalidDataException("There is no header in CSV after Split!");
            return headers;
        }
    }
    public class Program
    {        
        public static void Main(string[] args)
        {
            // Converter from CSV columns to SQL columns
            var csvColumn2SqlTypeDict = new Dictionary<string, Func<string, object>>
            {
                { "int", (s) => Convert.ToInt32(s) },
                { "str", (s) => s },
                { "double", (s) => Convert.ToDouble(s) },
                { "date", (s) => Convert.ToDateTime(s) },
            };
            Stopwatch sw = Stopwatch.StartNew();
            try
            {
                // example.csv
                /***
                   int,str,double,date
                   1,abcd,2.5,15.04.2002
                   2,dab,2.7,15.04.2007
                   3,daqqb,4.7,14.04.2007
                 ***/
                using (var csvReader = new CsvReader("example.csv", csvColumn2SqlTypeDict))
                {
                    // TODO!!! Modify to your Connection string
                    var cs = @"Server=localhost\SQLEXPRESS;initial catalog=TestDb;Integrated Security=true";
                    using (var loader = new SqlBulkCopy(cs, SqlBulkCopyOptions.Default))
                    {
                        // TODO Modify to your Destination table
                        loader.DestinationTableName = "Test";
                        // Write from csvReader to database
                        loader.WriteToServer(csvReader);
                    }
                }
            }
            catch(Exception ex)
            {
                Console.WriteLine("Got an exception: {0}", ex);
                Console.WriteLine("Press 'Enter' to quit");
                Console.ReadLine();
                return;
            }
            finally { sw.Stop(); }
            Console.WriteLine("Data has been written in {0}", sw.Elapsed);
            Console.WriteLine("Press 'Enter' to quit");
            Console.ReadLine();
        }
        private static void ShowCsv(IDataReader dr)
        {
            int i = 0;
            while (dr.Read())
            {
                Console.WriteLine("Row# {0}", i);
                for (int j = 0; j < dr.FieldCount; j++)
                {
                    Console.WriteLine("{0} => {1}", j, dr.GetValue(j));
                }
                i++;
            }
        }
    }
}
person Artavazd Balayan    schedule 25.07.2016

Я обнаружил, что забыть о DataTable и использовать старый добрый SQLClient построчно было быстрее. Тоже проще. Это также превзошло функцию потоковой передачи SQL, которая предположительно была самым быстрым способом получения данных в базе данных SQL Server.

Попробуйте и измерьте скорость, посмотрите, достаточно ли она для вас. Если это не так, вы всегда можете попытаться переформатировать файл (при необходимости) и позволить SQL Server сделать всю работу за вас, используя его Массовая вставка.

person gbjbaanb    schedule 25.07.2016