測(cè)試函數(shù)
- static async Task RunProgram()
- {
- var taskQueue = new ConcurrentQueue<CustomTask>();
- var cts = new CancellationTokenSource();
- //生成任務(wù)添加至并發(fā)隊(duì)列
- var taskSource = Task.Run(() => TaskProducer(taskQueue));
- //同時(shí)啟動(dòng)四個(gè)任務(wù)處理隊(duì)列中的任務(wù)
- Task[] processors = new Task[4];
- for(int i =1;i <= 4; i++)
- {
- string processId = i.ToString();
- processors[i - 1] = Task.Run(
- () => TaskProcessor(taskQueue, "Processor " + processId, cts.Token)
- );
- }
- await taskSource;
- //向任務(wù)發(fā)送取消信號(hào)
- cts.CancelAfter(TimeSpan.FromSeconds(2));
- await Task.WhenAll(processors);
- }
產(chǎn)生任務(wù)
- static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
- {
- for(int i= 0;i < 20; i++)
- {
- await Task.Delay(50);
- var workItem = new CustomTask { Id = i };
- queue.Enqueue(workItem);
- Console.WriteLine("task {0} has been posted", workItem.Id);
- }
- }
執(zhí)行任務(wù)
- static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
- {
- CustomTask workItem;
- bool dequeueSuccesful = false;
- await GetRandomDelay();
- do
- {
- dequeueSuccesful = queue.TryDequeue(out workItem);
- if (dequeueSuccesful)
- {
- Console.WriteLine("task {0} has been processed by {1}", workItem.Id, name);
- }
- await GetRandomDelay();
- }
- while (!token.IsCancellationRequested);
- }
- static Task GetRandomDelay()
- {
- int delay = new Random(DateTime.Now.Millisecond).Next(1500);
- return Task.Delay(delay);
- }
- class CustomTask
- {
- public int Id { get; set; }
- }
調(diào)用
- static void Main(string[] args)
- {
- Task t = RunProgram();
- t.Wait();
- Console.ReadKey();
- }