DBデータをストリーム処理する
CEP(複合イベント処理)の記事を見ていてふと業務アプリのバッチ処理に利用できないかなと思いました。ストリーム処理する構造にアナロジを感じました。
.NETのADO.NETにはストリームの読み込みにはDataReaderがあり、書き込みにはSqlBulkCopyがあります。これをLinqで繋げてストリームぽく動作させるイメージです。
とりあえずやってみました。以下のコードの後半部分が該当の処理です。
Selectした結果のDataReaderを一旦IEnumerable
using (SqlConnection sourceConnection = new SqlConnection(Settings.Default.AdventureWorksConnectionString)) { var command = new SqlCommand("select * from sales.SalesOrderDetail", sourceConnection); sourceConnection.Open(); using (SqlDataReader reader = command.ExecuteReader()) { using (SqlBulkCopy bulkCopy = new SqlBulkCopy(Settings.Default.AdventureWorksConnectionString)) { bulkCopy.DestinationTableName = "TestTable"; bulkCopy.WriteToServer( //この部分がストリーム的に処理される reader.ToEnumerable<SalesOrderDetail>() .AsParallel() .Select(x => new TestTable { ID = x.SalesOrderID, Data = x.OrderQty * x.UnitPrice }) .ToDataReader() ); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Data; using System.Reflection; using System.Collections; namespace RdbStream { public static class DataReaderLinqExtension { public static IEnumerable<T> ToEnumerable<T>(this IDataReader dr) where T : new() { List<Action<object,object>> setters = null; while (dr.Read()) { if (setters == null) { setters = new List<Action<object, object>>(); var dict = typeof(T).GetProperties().ToDictionary(x=>x.Name.ToLower(), x=>x); for (int index = 0; index < dr.FieldCount; index++) { var name = dr.GetName(index).ToLower(); setters.Add(dict.ContainsKey(name) ? new Action<object,object>(delegate(object target, object value) { dict[name].SetValue(target, value, null); }) : null); } } T newObject = new T(); for (int index = 0; index < dr.FieldCount; index++) { if (!dr.IsDBNull(index)) { var setter = setters[index]; if (setter != null) setter(newObject, dr.GetValue(index)); } } yield return newObject; } dr.Close(); } public static IDataReader ToDataReader<TEntity>(this IEnumerable<TEntity> entities) where TEntity : class { return new BulkCopyDataReader<TEntity>(entities); } private class BulkCopyDataReader<TEntity> : IDataReader where TEntity : class { private readonly IEnumerator<TEntity> _entities; List<Func<object, object>> getters; Dictionary<string,int> names; public BulkCopyDataReader(IEnumerable<TEntity> entities) { _entities = entities.GetEnumerator(); var props = typeof(TEntity).GetProperties(); int i=0; names = props.ToDictionary(x=>x.Name, x=>i++); getters = props.Select(x => new Func<object, object>( delegate(object target) { return x.GetValue(target, null); })).ToList(); } public virtual bool Read() { return _entities.MoveNext(); } public virtual object GetValue(int i) { return getters[i](_entities.Current); } public virtual int FieldCount { get { return getters.Count; } } public virtual int GetOrdinal(string name) { return names[name]; } public virtual void Close() { } public virtual void Dispose() { } public virtual object this[int i] { get { throw new NotImplementedException(); } } public virtual int Depth { get { throw new NotImplementedException(); } } public virtual bool IsClosed { get { throw new NotImplementedException(); } } public virtual int RecordsAffected { get { throw new NotImplementedException(); } } public virtual DataTable GetSchemaTable() { throw new NotImplementedException(); } public virtual bool NextResult() { throw new NotImplementedException(); } public virtual object this[string name] { get { throw new NotImplementedException(); } } public virtual bool GetBoolean(int i) { throw new NotImplementedException(); } public virtual byte GetByte(int i) { throw new NotImplementedException(); } public virtual long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) { throw new NotImplementedException(); } public virtual char GetChar(int i) { throw new NotImplementedException(); } public virtual long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) { throw new NotImplementedException(); } public virtual IDataReader GetData(int i) { throw new NotImplementedException(); } public virtual string GetDataTypeName(int i) { throw new NotImplementedException(); } public virtual DateTime GetDateTime(int i) { throw new NotImplementedException(); } public virtual decimal GetDecimal(int i) { throw new NotImplementedException(); } public virtual double GetDouble(int i) { throw new NotImplementedException(); } public virtual Type GetFieldType(int i) { throw new NotImplementedException(); } public virtual float GetFloat(int i) { throw new NotImplementedException(); } public virtual Guid GetGuid(int i) { throw new NotImplementedException(); } public virtual short GetInt16(int i) { throw new NotImplementedException(); } public virtual int GetInt32(int i) { throw new NotImplementedException(); } public virtual long GetInt64(int i) { throw new NotImplementedException(); } public virtual string GetName(int i) { throw new NotImplementedException(); } public virtual string GetString(int i) { throw new NotImplementedException(); } public virtual int GetValues(object[] values) { throw new NotImplementedException(); } public virtual bool IsDBNull(int i) { throw new NotImplementedException(); } } } }