前言:
异步编程是一种编程模式,它允许程序在执行过程中不阻塞主线程,以非阻塞的方式执行某些任务,并在任务完成时返回结果。在异步编程中,使用异步操作来执行这些任务,这样可以提高程序的的可响应性和性能。本文主要讲述在C#中使用异步编程需要注意哪些方式。
一.使用异步方法返回值应当避免使用void
static void Main(string[] args) { try { // 堆代码 duidaima.com // 如果Run方法无异常正常执行,那么程序无法得知其状态机什么时候执行完毕 Run(); } catch (Exception ex) { Console.WriteLine(ex.Message); } Console.Read(); } static async void Run() { // 由于方法返回的为void,所以在调用此方法时无法捕捉异常,使得进程崩溃 throw new Exception("异常了"); await Task.Run(() => { }); }☑️应该将异步函数返回Task
static async Task Main(string[] args) { try { // 因为在此进行await,所以主程序知道什么时候状态机执行完成 await RunAsync(); Console.Read(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } static async Task RunAsync() { // 因为此异步方法返回的为Task,所以此异常可以被捕捉 throw new Exception("异常了"); await Task.Run(() => { }); }注:事件是一个例外,异步事件也是返回void
public async Task<int> RunAsync() { return await Task.Run(()=>1+1); }☑️而应该使用Task.FromResult代替
public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }还有另外一种代替方法,那就是使用ValueTask类型,ValueTask是一个可被等待异步结构,所以并不会在堆中分配内存和任务分配,从而性能更优化.
static async Task Main(string[] args) { await AddAsync(1, 1); } static ValueTask<int> AddAsync(int a, int b) { // 返回一个可被等待的ValueTask类型 return new ValueTask<int>(a + b); }注: ValueTask结构是C#7.0加入的,存在于Sysntem,Threading.Task.Extensions包中
public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection<Message>(); public void StartProcessing() { Task.Run(ProcessQueue); } public void Enqueue(Message message) { _messageQueue.Add(message); } private void ProcessQueue() { foreach (var item in _messageQueue.GetConsumingEnumerable()) { ProcessItem(item); } } private void ProcessItem(Message message) { } }☑️所以应该改成这样
public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection<Message>(); public void StartProcessing() { var thread = new Thread(ProcessQueue) { // 设置线程为背后线程,使得在主线程结束时此线程也会自动结束 IsBackground = true }; thread.Start(); } public void Enqueue(Message message) { _messageQueue.Add(message); } private void ProcessQueue() { foreach (var item in _messageQueue.GetConsumingEnumerable()) { ProcessItem(item); } } private void ProcessItem(Message message) { } }🔔线程池内线程增加会导致在执行时大量的进行上下文切换,从而浪费程序的整体性能。
// 此方法会创建一个新线程进行执行 Task.Factory.StartNew(() => { }, TaskCreationOptions.LongRunning);四.避免使用Task.Result和Task.Wait()来堵塞线程
async Task<string> RunAsync() { // 此线程ID输出与UI线程ID不一致 Debug.WriteLine("UI线程:"+Thread.CurrentThread.ManagedThreadId); return await Task.Run(() => "Run"); } string DoOperationBlocking() { // 这种方法虽然摆脱了死锁的问题,但是也导致了上下文问题,RunAsync不在以UI线程调用 // Result和Wait()方法如果出现异常,异常将被包装为AggregateException进行抛出, return Task.Run(() => RunAsync()).Result; } } private async void button1_Click(object sender, EventArgs e) { Debug.WriteLine("RunAsync:" + Thread.CurrentThread.ManagedThreadId); Debug.WriteLine(DoOperationBlocking()); } public string DoOperationBlocking2() { // 此方法也是会导致上下文问题, // GetAwaiter()方法对异常不会包装 return Task.Run(() => RunAsync()).GetAwaiter().GetResult(); }五.建议使用await来代替continueWith任务
private void button1_Click(object sender, EventArgs e) { Debug.WriteLine("UI线程:" + Thread.CurrentThread.ManagedThreadId); RunAsync().ContinueWith(task => { Console.WriteLine("RunAsync returned:"+task.Result); // 因为是使用的continueWith,所以线程ID与UI线程并不一致 Debug.WriteLine("ContinueWith:" + Thread.CurrentThread.ManagedThreadId); }); } public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }☑️应该使用await来代替continueWith
private async void button1_Click(object sender, EventArgs e) { Debug.WriteLine("UI线程:" + Thread.CurrentThread.ManagedThreadId); Debug.WriteLine("RunAsync returned:"+ await RunAsync()); Debug.WriteLine("UI线程:" + Thread.CurrentThread.ManagedThreadId); } public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }六.创建TaskCompletionSource时建议使用TaskCreationOptions.RunContinuationsAsynchronously属性
static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); var tcs = new TaskCompletionSource<bool>(); // 使用TaskContinuationOptions.ExecuteSynchronously来测试延续任务 ContinueWith(1, tcs.Task); // 测试await延续任务 ContinueAsync(2, tcs.Task); Task.Run(() => { Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId ); tcs.TrySetResult(true); }); Console.ReadLine(); } static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}"); static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); print(id); } static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => print(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }☑️所以应该改为使用TaskCreationOptions.RunComtinuationsAsynchronously参数进行设置TaskCompletionSoure
static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); // 使用TaskContinuationOptions.ExecuteSynchronously来测试延续任务 ContinueWith(1, tcs.Task); // 测试await延续任务 ContinueAsync(2, tcs.Task); Task.Run(() => { Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); tcs.TrySetResult(true); }); Console.ReadLine(); } static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}"); static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); print(id); } static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => print(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }🔔TaskCreationOptions.RunContinuationsAsynchronously属性和TaskContinuationOptions.RunContinuationsAsynchronously很相似,但请注意它们的使用方式
public async Task<Stream> HttpClientAsyncWithCancellationBad() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // 堆代码 duidaima.com using (var client = _httpClientFactory.CreateClient()) { var response = await client.GetAsync("http://backend/api/1", cts.Token); return await response.Content.ReadAsStreamAsync(); } }☑️所以应该及时的释放CancellationSoure,使得正确的从队列中删除计时器
public async Task<Stream> HttpClientAsyncWithCancellationGood() { using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))) { using (var client = _httpClientFactory.CreateClient()) { var response = await client.GetAsync("http://backend/api/1", cts.Token); return await response.Content.ReadAsStreamAsync(); } } }🔔设置延迟时间具有两种方式
public CancellationTokenSource(TimeSpan delay); public CancellationTokenSource(int millisecondsDelay);2.调用实例对象CancelAfter()
public void CancelAfter(TimeSpan delay); public void CancelAfter(int millisecondsDelay);八.建议将协作式取消对象(CancellationToken)传递给所有使用到的API
public async Task<string> DoAsyncThing(CancellationToken cancellationToken = default) { byte[] buffer = new byte[1024]; // 使用FileOptions.Asynchronous参数指定异步通信 using(Stream stream = new FileStream( @"d:\资料\Blogs\Task\TaskTest", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024, options:FileOptions.Asynchronous)) { // 由于并没有将cancellationToken传递给ReadAsync,所以无法进行有效的取消 int read = await stream.ReadAsync(buffer, 0, buffer.Length); return Encoding.UTF8.GetString(buffer, 0, read); } }☑️所以应该将CancellationToken传递给ReadAsync(),以达到有效的取消
public async Task<string> DoAsyncThing(CancellationToken cancellationToken = default) { byte[] buffer = new byte[1024]; // 使用FileOptions.Asynchronous参数指定异步通信 using(Stream stream = new FileStream( @"d:\资料\Blogs\Task\TaskTest", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024, options:FileOptions.Asynchronous)) { // 由于并没有将cancellationToken传递给ReadAsync,所以无法进行有效的取消 int read = await stream.ReadAsync(buffer, 0, buffer.Length,cancellationToken); return Encoding.UTF8.GetString(buffer, 0, read); } }🔔在使用异步IO时,应该将options参数设置为FileOptions.Asynchronous,否则会产生额外的线程浪费,详细信息请参考CLR中28.12节
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) { // 没有方法释放cancellationToken注册 var delayTask = Task.Delay(-1, cancellationToken); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消异步操作 throw new OperationCanceledException(); } return await task; }:ballot_box_with_check:所以应该改成下面这样,在任务一完成,就释放CancellationTokenRegistry
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); using (cancellationToken.Register(state => { // 这样将在其中一个任务触发时立即释放CancellationTokenRegistry ((TaskCompletionSource<object>)state).TrySetResult(null); }, tcs)) { var resultTask = await Task.WhenAny(task, tcs.Task); if (resultTask == tcs.Task) { // 取消异步操作 throw new OperationCanceledException(cancellationToken); } return await task; } }使用超时任务
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout) { var delayTask = Task.Delay(timeout); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消异步操作 throw new OperationCanceledException(); } return await task; }:ballot_box_with_check:应改成下面这样,这样将在任务完成之后,取消计时器的操作
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout) { using (var cts = new CancellationTokenSource()) { var delayTask = Task.Delay(timeout, cts.Token); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消异步操作 throw new OperationCanceledException(); } else { // 取消计时器任务 cts.Cancel(); } return await task; } }十.使用StreamWriter(s)或Stream(s)时在Dispose之前建议先调用FlushAsync
public async static Task RunAsync() { using (var streamWriter = new StreamWriter(@"C:\资料\Blogs\Task")) { // 由于没有调用FlushAsync,所以最后是以同步方式进行write/flush的 await streamWriter.WriteAsync("Hello World"); } }☑️所以应该改为下面这样,在Dispose之前调用FlushAsync()
public async static Task RunAsync() { using (var streamWriter = new StreamWriter(@"C:\资料\Blogs\Task")) { await streamWriter.WriteAsync("Hello World"); // 调用FlushAsync() 使其使用异步write/flush await streamWriter.FlushAsync(); } }十一.建议使用 async/await而不是直接返回Task
public Task<int> RunAsync() { return Task.FromResult(1 + 1); }☑️所以应该使用async/await来代替返回Task
public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }🔔使用async/await来代替返回Task时,还有性能上的考虑,虽然直接Task会更快,但是最终却改变了异步的行为,失去了异步状态机的一些好处
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public async void Heartbeat(object state) { await httpClient.GetAsync("http://mybackend/api/ping"); } }❌下面例子将阻止计时器回调,这有可能导致线程池中线程耗尽,这也是一个异步差于同步的例子
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public void Heartbeat(object state) { httpClient.GetAsync("http://mybackend/api/ping").GetAwaiter().GetResult(); } }☑️下面例子是使用基于的异步的方法,并在定时器回调函数中丢弃该任务,并且如果此方法抛出异常,则也不会关闭进程,而是会触发TaskScheduler.UnobservedTaskException事件
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public void Heartbeat(object state) { _ = DoAsyncPing(); } private async Task DoAsyncPing() { // 异步等待 await _client.GetAsync("http://mybackend/api/ping"); }2.创建回调函数参数时注意避免 async void
public class BackgroundQueue { public static void FireAndForget(Action action) { } } static async Task Main(string[] args) { var httpClient = new HttpClient(); // 因为方法类型是Action,所以只能使用async void BackgroundQueue.FireAndForget(async () => { await httpClient.GetAsync("http://pinger/api/1"); }); }☑️所以应该构建一个回调异步方法的重载
public class BackgroundQueue { public static void FireAndForget(Action action) { } public static void FireAndForget(Func<Task> action) { } }3.使用ConcurrentDictionary.GetOrAdd注意场景
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, Person> _cache = new ConcurrentDictionary<int, Person>(); public PersonController(AppDbContext db) { _db = db; } public IActionResult Get(int id) { // 如果不存在缓存数据,则会进入堵塞状态 var person = _cache.GetOrAdd(id, (key) => db.People.FindAsync(key).Result); return Ok(person); } }☑️可以改成缓存线程本身,而不是结果,这样将不会导致线程池饥饿
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, Task<Person>> _cache = new ConcurrentDictionary<int, Task<Person>>(); public PersonController(AppDbContext db) { _db = db; } public async Task<IActionResult> Get(int id) { // 因为缓存的是线程本身,所以没有进行堵塞,也就不会产生线程池饥饿 var person = await _cache.GetOrAdd(id, (key) => db.People.FindAsync(key)); return Ok(person); } }🔔这种方法,在最后,GetOrAdd()可能并行多次来执行缓存回调,这可能导致启动多次昂贵的计算。
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, AsyncLazy<Person>> _cache = new ConcurrentDictionary<int, AsyncLazy<Person>>(); public PersonController(AppDbContext db) { _db = db; } public async Task<IActionResult> Get(int id) { // 使用Lazy进行了延迟加载(使用时调用),解决了多次执行回调问题 var person = await _cache.GetOrAdd(id, (key) => new AsyncLazy<Person>(() => db.People.FindAsync(key))); return Ok(person); } private class AsyncLazy<T> : Lazy<Task<T>> { public AsyncLazy(Func<Task<T>> valueFactory) : base(valueFactory) { } }4.构造函数对于异步的问题
public interface IRemoteConnectionFactory { Task<IRemoteConnection> ConnectAsync(); } public interface IRemoteConnection { Task PublishAsync(string channel, string message); Task DisposeAsync(); }
public class Service : IService { private readonly IRemoteConnection _connection; public Service(IRemoteConnectionFactory connectionFactory) { _connection = connectionFactory.ConnectAsync().Result; } }☑️正确的方式应该使用静态工厂模式进行异步连接
public class Service : IService { private readonly IRemoteConnection _connection; private Service(IRemoteConnection connection) { _connection = connection; } public static async Task<Service> CreateAsync(IRemoteConnectionFactory connectionFactory) { return new Service(await connectionFactory.ConnectAsync()); } }