WCF 学习总结7 -- 流模式(Streaming)实现文件上传

当有大量的数据要传输时,WCF的流模式是比较好的选择,因为流模式不是全部加载到内存后传输而是一边读取一边传输消息,所以流模式改善了系统的吞吐量和响应效率。 需要注意的是当启用了流模式,以下WCF功能无法使用: (1) 无法执行消息正文的数字签名; (2) 不支持Session; (3) 不支持可靠会话。WCF中定义的Binding中支持流模式的只有: BasicHttpBinding, NetTcpBinding, NetNamingPipeBinding, WebHttpBinding。

流和会话
在流与基于会话的绑定一起调用时可能会产生意外行为。可通过单一通道(数据报通道)执行所有流调用,该通道不支持会话,即使将正在使用的绑定配置为使用会话也是如此。如果多个客户端通过基于会话的绑定对同一服务对象进行流调用,并且 ConcurrencyMode = Single,同时 InstanceContextMode = PerSession,则所用调用都必须经过数据报通道,因此一次只处理一个调用。一个或多个客户端因此可能会超时。通过将该服务对象的 InstanceContextMode 设置为 PerCall 或将 ConcurrencyMode 设置为 Multiple 可以解决。

流模式契约示例
启用流模式时,要求操作契约只能以单个的Stream对象为输入输出参数,你可以按照以下示例定义契约:
[OperationContract] Stream GetStream(string data); [OperationContract] bool UploadStream(Stream stream); [OperationContract] Stream EchoStream(Stream stream);
注意:上面的UploadStream, EchoStream中添加第二个参数会导致流模式无效,因此如果想使用流传送更多信息,可以通过消息头携带信息,而消息正文是流内容。
[MessageContract] public class FileUploadData { [MessageHeader] public string FileName { get; set; } [MessageHeader] public int FileSize { get; set; } [MessageHeader] public string FileUniqueID { get; set; } [MessageBodyMember] public Stream FileData { get; set; } }

流模式实现文件上传
大数据传输的过程是比较耗时的,所以应该考虑异步调用,提高系统的响应。这里可以通过客户端生成代理时,指定生成异步操作。
WCF 学习总结7 -- 流模式(Streaming)实现文件上传
流模式最常用的场景就是文件的上传和下载了,一般上传下载的过程比较长,比较好的客户体验是实时的把进度告诉客户端。我一开始考虑用Duplex在写流的过程中,即时通知客户端。而上面的Binding方式中只有NetTcpBinding支持Duplex,但实验后发现一旦使用Duplex,流模式就失效了。所以后来只好采用客户端轮询的方式来获取当前的进度,如下图:
WCF 学习总结7 -- 流模式(Streaming)实现文件上传

服务契约(IUpLoadService.cs)
using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.ServiceModel; using System.Text; using System.IO; namespace WcfUploadServiceLib { [ServiceContract] public interface IUpLoadService { [OperationContract(IsOneWay = true)] void UploadFile(FileUploadData request); [OperationContract] int GetUploadFileInfo(string id); } [MessageContract] public class FileUploadData { [MessageHeader] public string FileName { get; set; } [MessageHeader] public int FileSize { get; set; } [MessageHeader] public string FileUniqueID { get; set; } [MessageBodyMember] public Stream FileData { get; set; } } }
定义了两个契约: 1. 用于上传; 2. 用于获取上传进度(根据客户端生成的唯一ID)

服务端实现(UpLoadService.cs)
using System; using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.ServiceModel; using System.Text; using System.IO; using System.Threading; namespace WcfUploadServiceLib { [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, ConcurrencyMode = ConcurrencyMode.Multiple)] public class UpLoadService : IUpLoadService { private const int BufferLen = 4096; private static Dictionary<string, int> _uploadInfoDict = new Dictionary<string, int>(); private static object _lockObj = new object(); public void UploadFile(FileUploadData request) { string uploadFolder = @"D:/VisualStudioProject2010/UploadTest/"; string fileName = request.FileName; Stream sourceStream = request.FileData; FileStream targetStream = null; if (!sourceStream.CanRead) { throw new Exception("Invalid Stream!"); } if (!Directory.Exists(uploadFolder)) { Directory.CreateDirectory(uploadFolder); } string filePath = Path.Combine(uploadFolder, fileName); using (targetStream = new FileStream(filePath, FileMode.Create, FileAccess.Write, FileShare.None)) { //read from the input stream in 4K chunks //and save to output stream const int bufferLen = 4096; byte[] buffer = new byte[BufferLen]; int count = 0; SetFileUploadInfo(request.FileUniqueID, 0); while ((count = sourceStream.Read(buffer, 0, BufferLen)) > 0) { targetStream.Write(buffer, 0, count); if (request.FileSize - (int)targetStream.Length > 100000) { SetFileUploadInfo(request.FileUniqueID, (int)targetStream.Length); Thread.Sleep(100); } } SetFileUploadInfo(request.FileUniqueID, request.FileSize); targetStream.Close(); sourceStream.Close(); } } private void SetFileUploadInfo(string id, int savedFileCount) { lock (_lockObj) { if (_uploadInfoDict.ContainsKey(id)) _uploadInfoDict[id] = savedFileCount; else _uploadInfoDict.Add(id, savedFileCount); } } public int GetUploadFileInfo(string id) { if (_uploadInfoDict.ContainsKey(id)) return _uploadInfoDict[id]; else return 0; } } }
定义服务行为的 InstanceContextMode 为 PerCall, 同时 ConcurrencyMode 设为 Multiple (并发),这样就支持多客户端同时上传了。在服务端还定义了一个static的Dictionary: _uploadInfoDict 是用于分别保存各个上传文件的进度。当然设置上传进度时,需要注意同步处理: lock (_lockObj)
private void SetFileUploadInfo(string id, int savedFileCount) { lock (_lockObj) { if (_uploadInfoDict.ContainsKey(id)) _uploadInfoDict[id] = savedFileCount; else _uploadInfoDict.Add(id, savedFileCount); } }

服务端配置:
<bindings> <basicHttpBinding> <binding name="MyBasicBinding" receiveTimeout="00:30:00" messageEncoding="Mtom" maxReceivedMessageSize="9223372036854775807" transferMode="Streamed" sendTimeout="00:30:00" /> </basicHttpBinding> </bindings>
maxReceivedMessageSize 定义了服务端接收Message的最大长度。因为传输时间比较长,所以 receiveTimeout 和 sendTimeout 也需要调整一下。(transferMode设为Streamed即启动了流模式)

客户端UI:
WCF 学习总结7 -- 流模式(Streaming)实现文件上传

客户端实现:
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.IO; using System.Threading; namespace WcfUploadClient { public partial class Form1 : Form { public Form1() { InitializeComponent(); } private WcfUploadSvc.UpLoadServiceClient _client; private Thread _uploadWatchThread; private bool _uploadCompleted; private Action<int, int> _updateLabel; private Action<int> _updateProcessBar; private void Form1_Load(object sender, EventArgs e) { _client = new WcfUploadSvc.UpLoadServiceClient(); _uploadCompleted = false; _updateLabel = (curr, total) => { lblInfo.Text = string.Format("{0}KB/{1}KB", curr/1000, total/1000); }; _updateProcessBar = (process) => { progressBar1.Value = process; }; } private void btnUpload_Click(object sender, EventArgs e) { var filepath = textBox1.Text; if (string.IsNullOrEmpty(filepath) || !File.Exists(filepath)) return; _uploadCompleted = false; WcfUploadSvc.FileUploadData uploadData = new WcfUploadSvc.FileUploadData(); uploadData.FileName = Path.GetFileName(filepath); uploadData.FileData = File.OpenRead(filepath); uploadData.FileSize = (int)uploadData.FileData.Length; uploadData.FileUniqueID = Guid.NewGuid().ToString(); _client.UploadFileAsync(uploadData.FileName, uploadData.FileSize, uploadData.FileUniqueID, uploadData.FileData); _client.UploadFileCompleted += new EventHandler<AsyncCompletedEventArgs>(client_UploadFileCompleted); _uploadWatchThread = new Thread(UpdateFileUploadInfo); _uploadWatchThread.Start(uploadData); btnBrowser.Enabled = false; btnUpload.Enabled = false; } void UpdateFileUploadInfo(object obj) { var uploadData = obj as WcfUploadSvc.FileUploadData; var uploadSize = _client.GetUploadFileInfo(uploadData.FileUniqueID); var totalSize = uploadData.FileSize; while (uploadSize < totalSize) { double process = ((double)uploadSize / (double)uploadData.FileSize) * 100; progressBar1.Invoke(_updateProcessBar, (int)process); lblInfo.Invoke(_updateLabel, uploadSize, totalSize); Thread.Sleep(500); if (_uploadCompleted) { progressBar1.Invoke(_updateProcessBar, progressBar1.Maximum); lblInfo.Invoke(_updateLabel, totalSize, totalSize); break; } else { uploadSize = _client.GetUploadFileInfo(uploadData.FileUniqueID); } } } void client_UploadFileCompleted(object sender, AsyncCompletedEventArgs e) { _uploadCompleted = true; btnBrowser.Enabled = true; btnUpload.Enabled = true; } private void btnBrowser_Click(object sender, EventArgs e) { if (openFileDialog1.ShowDialog() == DialogResult.OK) textBox1.Text = openFileDialog1.FileName; } private void Form1_FormClosing(object sender, FormClosingEventArgs e) { if (!_uploadWatchThread.Join(500)) { try { _uploadWatchThread.Abort(); } catch { } } } } }

上传方法使用的是异步方法:
_client.UploadFileAsync
同时上传过程中启动了Watch线程,不断从服务端取得最新的上传进度:
_uploadWatchThread = new Thread(UpdateFileUploadInfo);
_uploadWatchThread.Start(uploadData);

多客户端同时上传:
WCF 学习总结7 -- 流模式(Streaming)实现文件上传


本系列链接:

WCF 学习总结1 -- 简单实例

WCF 学习总结2 -- 配置WCF

WCF 学习总结3 -- 实例模式

WCF 学习总结4 -- 用Duplex实现消息广播

WCF 学习总结5 -- 消息拦截实现用户名验证

WCF 学习总结6 -- WCF参数与返回值

WCF 学习总结7 -- 流模式实现文件上传

WCF 学习总结8 –- WCF 事务处理