在.net1.1中无论是对于批量插入整个datatable中的所有数据到数据库中,还是进行不同数据源之间的迁移,都不是很方便。而在.net2.0中,sqlclient命名空间下增加了几个新类帮助我们通过datatable或datareader批量迁移数据。数据源可以来自关系数据库或者xml文件,甚至webservice返回结果。其中最重要的一个类就是sqlbulkcopy类,使用它可以很方便的帮助我们把数据源的数据迁移到目标数据库中。
下面我们先通过一个简单的例子说明这个类的使用:
datetime starttime;
protected void button1_click(object sender, eventargs e)
{
starttime = datetime.now;
string srcconstring;
string desconstring;
sqlconnection srccon = new sqlconnection();
sqlconnection descon = new sqlconnection();
sqlcommand srccom = new sqlcommand();
sqldataadapter srcadapter = new sqldataadapter();
datatable dt = new datatable();
srcconstring =
configurationmanager.connectionstrings["srcdbconnectionstring"].connectionstring;
desconstring =
configurationmanager.connectionstrings["desdbconnectionstring"].connectionstring;
srccon.connectionstring = srcconstring;
srccom.connection = srccon;
srccom.commandtext = " select * from [srctable]";
srccom.commandtype = commandtype.text;
srccom.connection.open();
srcadapter.selectcommand = srccom;
srcadapter.fill(dt);
sqlbulkcopy desbulkop;
desbulkop = new sqlbulkcopy(desconstring,
sqlbulkcopyoptions.useinternaltransaction);
desbulkop.bulkcopytimeout = 500000000;
desbulkop.sqlrowscopied +=
new sqlrowscopiedeventhandler(onrowscopied);
desbulkop.notifyafter = dt.rows.count;
try
{
desbulkop.destinationtablename = "srctable";
desbulkop.writetoserver(dt);
}
catch (exception ex)
{
lblresult.text = ex.message;
}
finally
{
srccon.close();
descon.close();
}
}
private void onrowscopied(object sender, sqlrowscopiedeventargs args)
{
lblcounter.text += args.rowscopied.tostring() + " rows are copied<br>";
timespan copytime = datetime.now - starttime;
lblcounter.text += "copy time:" + copytime.seconds.tostring() + "." + copytime.milliseconds.tostring() + " seconds";
}
接着具体分析这几行代码:
sqlbulkcopy desbulkop;
desbulkop = new sqlbulkcopy(desconstring, sqlbulkcopyoptions.useinternaltransaction);先生成sqlbulkcopy 实例,构造函数指定了目标数据库,使用sqlbulkcopyoptions.useinternaltransaction是指迁移动作指定在一个transaction当中,如果数据迁移中产生错误或异常将发生回滚。其他选项请参考msdn。
desbulkop.bulkcopytimeout = 500000000;
指定操作完成的timeout时间
desbulkop.sqlrowscopied += new sqlrowscopiedeventhandler(onrowscopied);
desbulkop.notifyafter = dt.rows.count;
try
{
desbulkop.destinationtablename = "srctable";
desbulkop.writetoserver(dt);
}
notifyafter属性指定通知通知事件前处理的数据行数,在这里指定为表的行数,并添加sqlrowscopied事件输出整个迁移过程的时间。writetoserver方法就是将数据源拷备到目标数据库。在使用writetoserver方法之前必须先指定destinationtablename属性,也就是目标数据库的表名,
我们还可以自己定义一个transaction,例如:
sqltransaction transaction;
transaction =
srccom.connection.begintransaction();
sqlbulkcopy desbulkop;
desbulkop = new sqlbulkcopy(new sqlconnection(desconstring),
sqlbulkcopyoptions.default,
transaction);
try
{
//..
}
catch{}
finally
{
transaction.commit();
}
另外还有一个sqlbulkcopycolumnmapping类,可以让数据源字段映射到目标数据中命名不同的字段上。也就是说如果目标数据和源数据的列名不同时,可以用这个类进行映射:
sqlbulkcopycolumnmapping colmap = new sqlbulkcopycolumnmapping("srccol", "descol");
desbulkop.columnmappings.add(colmap);
或者可以直接添加映射:
desbulkop.columnmappings.add("srccol", "descol");
性能问题:
我使用上面的例子测试,迁移了2万条左右的记录,花的时间不到一秒,应改说性能还是不错的。另外,使用sql profile监视迁移事件,可以看见请求记录非常少,只有几条而已。据说使用sqlbulkcopy可以大大减少数据迁移的时间。
新闻热点
疑难解答
图片精选