主页 > 软件开发  > 

C#ConcurrentQueue使用详解

C#ConcurrentQueue使用详解
总目录
前言

在C#多线程编程中,数据共享如同走钢丝——稍有不慎就会引发竞态条件(Race Condition)或死锁。传统Queue<T>在并发场景下需要手动加锁,而ConcurrentQueue<T>作为.NET Framework 4.0 引入的线程安全集合,采用无锁算法(Lock-Free),能显著提升高并发场景下的性能。今天,我们就来深入探讨一下 ConcurrentQueue<T> 的使用方法和特性。


一、基本信息 1. 基本概念 ConcurrentQueue<T> 是一个线程安全的先进先出(FIFO)队列,属于 System.Collections.Concurrent 命名空间。它遵循先进先出(FIFO)的原则,允许多个线程同时对队列进行操作,而无需额外的锁机制。用于在生产者和消费者场景中高效地处理数据。但需要注意的是,它并不保证元素在同一个线程内入队顺序和出队顺序完全一致。 2. 核心特性速览 1) 线程安全保证 无锁设计:通过CAS(Compare-And-Swap)原子操作实现高效并发 无锁编程:ConcurrentQueue<T> 使用了无锁编程技术,减少了锁的开销,提高了性能。原子操作:队列的入队和出队操作是原子性的,这意味着即使在多线程环境下,操作也不会被打断 FIFO原则:先进先出(但线程间顺序不绝对保证,在多线程环境下,队列的顺序可能会受到线程调度的影响。)高吞吐量:实测在16线程并发下吞吐量可达普通锁队列的3倍+内存高效:采用链表结构动态扩展,避免数组复制的开销 2) 性能对比(基准测试) 操作类型ConcurrentQueueQueue+Lock100万次入队45 ms210 ms100万次出队38 ms195 ms 3. 适用场景 生产者 - 消费者模式(日志记录、任务分发) 在生产者 - 消费者模式中,多个生产者线程同时向队列中放入任务(元素),多个消费者线程从队列中取出任务执行。ConcurrentQueue可以完美适配这种场景,确保数据的安全传递和并发操作的效率。例如,多个网络请求到达服务器(生产者),服务器将这些请求放入ConcurrentQueue,然后多个工作线程从队列中取出请求进行处理(消费者)。 任务调度系统 当需要调度多个任务按照顺序执行时,ConcurrentQueue可以用来存储任务的顺序。多个调度器线程可以从队列中取出任务并分配到合适的资源上执行,保证任务的有序性和并发性。 二、基本操作 1. 初始化队列 var queue = new ConcurrentQueue<string>(); 2. 入队操作(Enqueue) Enqueue方法用于向队列中添加元素。例如: ConcurrentQueue<int> queue = new ConcurrentQueue<int>(); queue.Enqueue(1); queue.Enqueue(2); 在多线程环境下,多个线程可以同时调用Enqueue方法,而不需要担心数据冲突问题。 // 多线程安全添加 Parallel.For(0, 1000, i => { queue .Enqueue($"Item_{i}"); }); 2. 出队操作(TryDequeue) TryDequeue方法尝试从队列中取出一个元素。示例代码如下: int value; if (queue.TryDequeue(out value)) { Console.WriteLine(value); } // 或 if (queue.TryDequeue(out int value2)) { Console.WriteLine(value2); } 如果队列中有元素,TryDequeue会成功取出元素并将队列修改为相应的状态,返回true;如果队列为空,则返回false,value保持其初始值。这一特性使得它在多线程并发访问队列时非常方便,不需要像普通队列那样额外进行线程同步处理。 3. 查看队首元素(TryPeek) TryPeek方法可以查看队列的第一个元素而不将其移除队列。例如: ConcurrentQueue<int> queue= new ConcurrentQueue<int>(); for (int i = 0; i < 10000; i++) { queue.Enqueue(i); } int result = 0; if (!queue.TryPeek(out result)) { Console.WriteLine("TryPeek failed when it should have succeeded"); } else if (result!= 0) { Console.WriteLine($"Expected TryPeek result of 0, got {result}"); } 4. TryGetNonEnumeratedCount 与 Count 1)TryGetNonEnumeratedCount 的作用

TryGetNonEnumeratedCount 是 .NET 6+ 引入的通用集合操作方法,其作用如下:

尝试在不枚举集合的情况下获取元素数量对于实现了ICollection接口的类型(如ConcurrentQueue<T>、ConcurrentBag<T>),直接返回Count属性值避免某些集合类型(如普通IEnumerable)需要枚举才能计数的性能损耗 2)与Count的区别 特性TryGetNonEnumeratedCountCount 属性适用范围所有IEnumerable类型具体集合类型返回值类型bool(是否成功获取)int(直接返回数量)实现机制通过接口检查优化路径直接访问内部计数器对未实现ICollection的集合可能返回false并需要枚举不可用 3) 示例 var queue = new ConcurrentQueue<int>(); queue.Enqueue(1); queue.Enqueue(2); // 传统方式(直接访问 Count 属性) Console.WriteLine($"Count: {queue.Count}"); // 新方式(实现 ICollection 接口的通用方法) if (queue.TryGetNonEnumeratedCount(out int count)) { Console.WriteLine($"Non-enumerated count: {count}"); }

对于ConcurrentQueue<T>,两种方式本质相同。但在编写通用集合处理代码时,TryGetNonEnumeratedCount能更好地兼容各种集合类型,避免对未实现ICollection接口的集合进行低效枚举

5. 其他操作 1)清空队列 // 清空队列(.NET 5+) queue.Clear(); // 注意:非原子操作! 2)IsEmpty

判断集合是否为空(同样存在瞬时性,可能不准确)。

TryDequeue 可能失败,需结合循环或超时机制

while (!queue.IsEmpty) { if (queue.TryDequeue(out int item)) Process(item); } 3)批量操作 // 转换为数组 var snapshot = concurrentQueue.ToArray(); // 复制到目标数组 string[] buffer = new string[100]; concurrentQueue.CopyTo(buffer, 0); 三、为什么需要 ConcurrentQueue?

在多线程环境中,普通的队列(如 Queue<T>)可能会引发线程安全问题。例如,当多个线程同时对队列进行读写操作时,可能会导致数据丢失、异常或程序崩溃。而 ConcurrentQueue<T> 内部实现了高效的线程同步机制,确保了在并发场景下的数据安全。

1. 非线程安全案例 using System.Collections; class Program { static void Main() { // 非线程安全版本(错误示例) var unsafeQueue = new Queue<int>(); Parallel.For(0, 1000, i => { unsafeQueue.Enqueue(i); // 会导致数据丢失或抛出异常 }); Console.WriteLine($"非安全集合数量: {unsafeQueue.Count}"); // 结果通常小于1000 } }

运行结果:

运行代码时,unsafeQueue .Count 通常会小于 1000,甚至可能抛出异常。结果不确定:由于线程竞争是随机的,每次运行的结果可能不同。 2. 为什么不安全? 1) 问题根源 线程不安全的 Queue Queue 是普通的先进先出(FIFO)集合,但不保证多线程并发操作的安全性。当多个线程同时调用 Enqueue() 时,可能发生以下问题: 数据覆盖:多个线程可能同时修改队列的底层数组和内部索引(如 _size 和 _tail),导致写入位置冲突,部分数据被覆盖。容量扩展竞争:当队列需要扩容时,多个线程可能同时触发内部数组的重新分配,导致数据丢失或数组损坏。计数不一致:Count 属性的值可能因线程间竞争而无法正确累加。 Parallel.For 的并发写入 Parallel.For(0, 1000, i => { … }) 会创建多个线程并行执行 Enqueue(i)。 2)错误场景

假设两个线程同时执行 Enqueue():

线程 A 和线程 B 同时读取队列的当前尾部索引 _tail,假设此时 _tail = 5。线程 A 将值写入索引 5,然后更新 _tail 为 6。线程 B 也将值写入索引 5(因为它在步骤 1 中读到的 _tail 是 5),覆盖线程 A 写入的数据。最终队列实际写入的数据少于预期,且 Count 的值可能小于 1000。 3. 解决方案 1)使用线程安全的 ConcurrentQueue<T> var safeQueue = new ConcurrentQueue<int>(); Parallel.For(0, 1000, i => { safeQueue.Enqueue(i); // 线程安全 }); Console.WriteLine($"安全集合数量: {safeQueue.Count}"); // 结果为 1000 ConcurrentQueue 内部通过无锁算法或细粒度锁保证线程安全。 2)手动同步(lock 语句) var unsafeQueue = new Queue<int>(); object lockObj = new object(); Parallel.For(0, 1000, i => { lock (lockObj) { // 强制串行化写入 unsafeQueue.Enqueue(i); } }); 通过锁强制每次 Enqueue 操作串行执行,但会牺牲并发性能。 4. Queue与ConcurrentQueue 与Queue的区别 在普通的Queue<T>中,如果不是线程安全的环境,在多线程同时进行入队和出队操作时可能会产生数据混乱等问题,需要手动进行加锁等操作来保证线程安全。而ConcurrentQueue<T>是线程安全的,不需要额外的锁操作就能正确处理并发情况。 性能优势 在高并发场景下,ConcurrentQueue的非阻塞算法(无锁)相比使用锁的传统队列有更好的性能。例如,普通使用锁的入队和出队操作(如下代码),在高并发时会导致线程频繁阻塞和唤醒:而ConcurrentQueue通过原子操作避免了线程阻塞,提高了并发处理效率。 public class LockedQueue<T> { private Queue<T> _queue = new Queue<T>(); private object _lock = new object(); public void Enqueue(T item) { lock (_lock) { _queue.Enqueue(item); } } public bool TryDequeue(out T result) { lock (_lock) { if (_queue.Count > 0) { result = _queue.Dequeue(); return true; } result = default; return false; } } } 5. 使用示例 using System; using System.Collections.Concurrent; using System.Threading.Tasks; public class Program { static void Main() { ConcurrentQueue<int> queue = new ConcurrentQueue<int>(); // 生产者线程 Task producer = Task.Run(() => { for (int i = 0; i < 10; i++) { queue.Enqueue(i); Console.WriteLine($"Enqueued: {i}"); } }); // 消费者线程 Task consumer = Task.Run(() => { while (true) { if (queue.TryDequeue(out int result)) { Console.WriteLine($"Dequeued: {result}"); } } }); Task.WaitAll(producer, consumer); } }

在这个示例中,生产者线程负责向队列中添加数据,消费者线程负责从队列中移除数据。由于 ConcurrentQueue<T> 的线程安全性,我们无需担心线程冲突问题。

四、典型应用场景 1. 生产者-消费者模式(带优雅关闭) public class PipelineExample { private readonly ConcurrentQueue<DataPacket> _queue = new(); private readonly CancellationTokenSource _cts = new(); public void StartProcessing(int consumerCount) { // 生产者线程 Task.Run(() => { while (!_cts.IsCancellationRequested) { var data = ReceiveNetworkPacket(); _queue.Enqueue(data); } }); // 消费者线程池 Parallel.For(0, consumerCount, i => { while (true) { if (_queue.TryDequeue(out var data)) { ProcessData(data); } else if (_cts.IsCancellationRequested) { break; } else { SpinWait.SpinUntil(() => !_queue.IsEmpty || _cts.IsCancellationRequested); } } }); } public void Stop() => _cts.Cancel(); } ConcurrentQueue<SensorData> dataQueue = new(); // 生产者线程 Task.Run(() => { while (true) { var data = ReadSensor(); dataQueue.Enqueue(data); Thread.Sleep(100); } }); // 消费者线程 Task.Run(() => { while (true) { if (dataQueue.TryDequeue(out SensorData data)) { SaveToDatabase(data); } else { Thread.Sleep(50); // 降低CPU占用 } } }); 2. 高并发日志系统设计 public static class AsyncLogger { private static readonly ConcurrentQueue<string> _logQueue = new(); private static readonly AutoResetEvent _signal = new(false); static AsyncLogger() { Task.Run(() => { using var writer = new StreamWriter("app.log"); while (true) { _signal.WaitOne(); while (_logQueue.TryDequeue(out var message)) { writer.WriteLine($"[{DateTime.UtcNow:O}] {message}"); } writer.Flush(); } }); } public static void Log(string message) { _logQueue.Enqueue(message); _signal.Set(); } } 五、注意事项 元素顺序的相对性 虽然ConcurrentQueue遵循FIFO原则,但是由于并发操作的存在,同一个线程内先入队的元素可能会后出队。在编写代码时需要考虑到这种情况,避免对元素顺序有过于严格的预期。虽然号称FIFO,但在以下场景可能出现顺序异常: // 线程A cq.Enqueue(1); // 时间戳T1 cq.Enqueue(2); // T2 // 线程B cq.Enqueue(3); // T1.5 // 可能出队顺序:1 → 3 → 2 内存管理 在高频率入队和出队操作中,要注意内存的使用情况,因为队列中的元素可能会随着时间不断积累(如果没有及时消费),可能会导致内存占用过高。对象池模式:复用出队对象,减少GC压力容量监控:定期检查cq.Count,设置阈值报警 // 对象池示例 var objectPool = new ObjectPool<DataModel>(() => new DataModel()); var item = objectPool.Get(); try { // 使用item... } finally { objectPool.Return(item); } 避免频繁计数:Count 属性需要遍历链表,复杂度O(n) 六、 替代方案

当需要线程安全的先进先出集合时,ConcurrentQueue<T>通常是首选。但在以下场景需考虑替代方案:

优先级队列 → PriorityQueue(.NET 6+)延迟处理 → System.Threading.Channels跨进程通信 → MemoryMappedFile + 环形缓冲区在需要阻塞操作时考虑结合 BlockingCollection

与其他并发容器的对比

特性ConcurrentQueueBlockingCollectionChannels阻塞操作❌✔️✔️ (.NET Core+)边界控制❌✔️✔️内存效率高中高适用场景非阻塞队列有界集合异步管道
结语

回到目录页:C#/.NET 知识汇总 希望以上内容可以帮助到大家,如文中有不对之处,还请批评指正。


参考资料: ConcurrentQueue<T> 类

标签:

C#ConcurrentQueue使用详解由讯客互联软件开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“C#ConcurrentQueue使用详解