1use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
14
15const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
22
23const LAP: usize = 32;
25const BLOCK_CAP: usize = LAP - 1;
27const SHIFT: usize = 1;
29const MARK_BIT: usize = 1;
33
34struct Slot<T> {
36 msg: UnsafeCell<MaybeUninit<T>>,
38
39 state: AtomicUsize,
41}
42
43impl<T> Slot<T> {
44 fn wait_write(&self) {
46 let backoff = Backoff::new();
47 while self.state.load(Ordering::Acquire) & WRITE == 0 {
48 backoff.spin_heavy();
49 }
50 }
51}
52
53struct Block<T> {
57 next: AtomicPtr<Block<T>>,
59
60 slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65 fn new() -> Box<Block<T>> {
67 unsafe { Box::new_zeroed().assume_init() }
74 }
75
76 fn wait_next(&self) -> *mut Block<T> {
78 let backoff = Backoff::new();
79 loop {
80 let next = self.next.load(Ordering::Acquire);
81 if !next.is_null() {
82 return next;
83 }
84 backoff.spin_heavy();
85 }
86 }
87
88 unsafe fn destroy(this: *mut Block<T>, start: usize) {
90 for i in start..BLOCK_CAP - 1 {
93 let slot = unsafe { (*this).slots.get_unchecked(i) };
94
95 if slot.state.load(Ordering::Acquire) & READ == 0
97 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98 {
99 return;
101 }
102 }
103
104 drop(unsafe { Box::from_raw(this) });
106 }
107}
108
109#[derive(Debug)]
111struct Position<T> {
112 index: AtomicUsize,
114
115 block: AtomicPtr<Block<T>>,
117}
118
119#[derive(Debug)]
121pub(crate) struct ListToken {
122 block: *const u8,
124
125 offset: usize,
127}
128
129impl Default for ListToken {
130 #[inline]
131 fn default() -> Self {
132 ListToken { block: ptr::null(), offset: 0 }
133 }
134}
135
136pub(crate) struct Channel<T> {
144 head: CachePadded<Position<T>>,
146
147 tail: CachePadded<Position<T>>,
149
150 receivers: SyncWaker,
152
153 _marker: PhantomData<T>,
155}
156
157impl<T> Channel<T> {
158 pub(crate) fn new() -> Self {
160 Channel {
161 head: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 tail: CachePadded::new(Position {
166 block: AtomicPtr::new(ptr::null_mut()),
167 index: AtomicUsize::new(0),
168 }),
169 receivers: SyncWaker::new(),
170 _marker: PhantomData,
171 }
172 }
173
174 fn start_send(&self, token: &mut Token) -> bool {
176 let backoff = Backoff::new();
177 let mut tail = self.tail.index.load(Ordering::Acquire);
178 let mut block = self.tail.block.load(Ordering::Acquire);
179 let mut next_block = None;
180
181 loop {
182 if tail & MARK_BIT != 0 {
184 token.list.block = ptr::null();
185 return true;
186 }
187
188 let offset = (tail >> SHIFT) % LAP;
190
191 if offset == BLOCK_CAP {
193 backoff.spin_heavy();
194 tail = self.tail.index.load(Ordering::Acquire);
195 block = self.tail.block.load(Ordering::Acquire);
196 continue;
197 }
198
199 if offset + 1 == BLOCK_CAP && next_block.is_none() {
202 next_block = Some(Block::<T>::new());
203 }
204
205 if block.is_null() {
208 let new = Box::into_raw(Block::<T>::new());
209
210 if self
211 .tail
212 .block
213 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214 .is_ok()
215 {
216 self.head.block.store(new, Ordering::Release);
217 block = new;
218 } else {
219 next_block = unsafe { Some(Box::from_raw(new)) };
220 tail = self.tail.index.load(Ordering::Acquire);
221 block = self.tail.block.load(Ordering::Acquire);
222 continue;
223 }
224 }
225
226 let new_tail = tail + (1 << SHIFT);
227
228 match self.tail.index.compare_exchange_weak(
230 tail,
231 new_tail,
232 Ordering::SeqCst,
233 Ordering::Acquire,
234 ) {
235 Ok(_) => unsafe {
236 if offset + 1 == BLOCK_CAP {
238 let next_block = Box::into_raw(next_block.unwrap());
239 self.tail.block.store(next_block, Ordering::Release);
240 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
241 (*block).next.store(next_block, Ordering::Release);
242 }
243
244 token.list.block = block as *const u8;
245 token.list.offset = offset;
246 return true;
247 },
248 Err(_) => {
249 backoff.spin_light();
250 tail = self.tail.index.load(Ordering::Acquire);
251 block = self.tail.block.load(Ordering::Acquire);
252 }
253 }
254 }
255 }
256
257 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
259 if token.list.block.is_null() {
261 return Err(msg);
262 }
263
264 let block = token.list.block as *mut Block<T>;
266 let offset = token.list.offset;
267 unsafe {
268 let slot = (*block).slots.get_unchecked(offset);
269 slot.msg.get().write(MaybeUninit::new(msg));
270 slot.state.fetch_or(WRITE, Ordering::Release);
271 }
272
273 self.receivers.notify();
275 Ok(())
276 }
277
278 fn start_recv(&self, token: &mut Token) -> bool {
280 let backoff = Backoff::new();
281 let mut head = self.head.index.load(Ordering::Acquire);
282 let mut block = self.head.block.load(Ordering::Acquire);
283
284 loop {
285 let offset = (head >> SHIFT) % LAP;
287
288 if offset == BLOCK_CAP {
290 backoff.spin_heavy();
291 head = self.head.index.load(Ordering::Acquire);
292 block = self.head.block.load(Ordering::Acquire);
293 continue;
294 }
295
296 let mut new_head = head + (1 << SHIFT);
297
298 if new_head & MARK_BIT == 0 {
299 atomic::fence(Ordering::SeqCst);
300 let tail = self.tail.index.load(Ordering::Relaxed);
301
302 if head >> SHIFT == tail >> SHIFT {
304 if tail & MARK_BIT != 0 {
306 token.list.block = ptr::null();
308 return true;
309 } else {
310 return false;
312 }
313 }
314
315 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
317 new_head |= MARK_BIT;
318 }
319 }
320
321 if block.is_null() {
324 backoff.spin_heavy();
325 head = self.head.index.load(Ordering::Acquire);
326 block = self.head.block.load(Ordering::Acquire);
327 continue;
328 }
329
330 match self.head.index.compare_exchange_weak(
332 head,
333 new_head,
334 Ordering::SeqCst,
335 Ordering::Acquire,
336 ) {
337 Ok(_) => unsafe {
338 if offset + 1 == BLOCK_CAP {
340 let next = (*block).wait_next();
341 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
342 if !(*next).next.load(Ordering::Relaxed).is_null() {
343 next_index |= MARK_BIT;
344 }
345
346 self.head.block.store(next, Ordering::Release);
347 self.head.index.store(next_index, Ordering::Release);
348 }
349
350 token.list.block = block as *const u8;
351 token.list.offset = offset;
352 return true;
353 },
354 Err(_) => {
355 backoff.spin_light();
356 head = self.head.index.load(Ordering::Acquire);
357 block = self.head.block.load(Ordering::Acquire);
358 }
359 }
360 }
361 }
362
363 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
365 if token.list.block.is_null() {
366 return Err(());
368 }
369
370 let block = token.list.block as *mut Block<T>;
372 let offset = token.list.offset;
373 unsafe {
374 let slot = (*block).slots.get_unchecked(offset);
375 slot.wait_write();
376 let msg = slot.msg.get().read().assume_init();
377
378 if offset + 1 == BLOCK_CAP {
381 Block::destroy(block, 0);
382 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
383 Block::destroy(block, offset + 1);
384 }
385
386 Ok(msg)
387 }
388 }
389
390 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
392 self.send(msg, None).map_err(|err| match err {
393 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
394 SendTimeoutError::Timeout(_) => unreachable!(),
395 })
396 }
397
398 pub(crate) fn send(
400 &self,
401 msg: T,
402 _deadline: Option<Instant>,
403 ) -> Result<(), SendTimeoutError<T>> {
404 let token = &mut Token::default();
405 assert!(self.start_send(token));
406 unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
407 }
408
409 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
411 let token = &mut Token::default();
412
413 if self.start_recv(token) {
414 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
415 } else {
416 Err(TryRecvError::Empty)
417 }
418 }
419
420 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
422 let token = &mut Token::default();
423 loop {
424 if self.start_recv(token) {
425 unsafe {
426 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
427 }
428 }
429
430 if let Some(d) = deadline {
431 if Instant::now() >= d {
432 return Err(RecvTimeoutError::Timeout);
433 }
434 }
435
436 Context::with(|cx| {
438 let oper = Operation::hook(token);
439 self.receivers.register(oper, cx);
440
441 if !self.is_empty() || self.is_disconnected() {
443 let _ = cx.try_select(Selected::Aborted);
444 }
445
446 let sel = unsafe { cx.wait_until(deadline) };
449
450 match sel {
451 Selected::Waiting => unreachable!(),
452 Selected::Aborted | Selected::Disconnected => {
453 self.receivers.unregister(oper).unwrap();
454 }
457 Selected::Operation(_) => {}
458 }
459 });
460 }
461 }
462
463 pub(crate) fn len(&self) -> usize {
465 loop {
466 let mut tail = self.tail.index.load(Ordering::SeqCst);
468 let mut head = self.head.index.load(Ordering::SeqCst);
469
470 if self.tail.index.load(Ordering::SeqCst) == tail {
472 tail &= !((1 << SHIFT) - 1);
474 head &= !((1 << SHIFT) - 1);
475
476 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
478 tail = tail.wrapping_add(1 << SHIFT);
479 }
480 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
481 head = head.wrapping_add(1 << SHIFT);
482 }
483
484 let lap = (head >> SHIFT) / LAP;
486 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
487 head = head.wrapping_sub((lap * LAP) << SHIFT);
488
489 tail >>= SHIFT;
491 head >>= SHIFT;
492
493 return tail - head - tail / LAP;
495 }
496 }
497 }
498
499 pub(crate) fn capacity(&self) -> Option<usize> {
501 None
502 }
503
504 pub(crate) fn disconnect_senders(&self) -> bool {
508 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
509
510 if tail & MARK_BIT == 0 {
511 self.receivers.disconnect();
512 true
513 } else {
514 false
515 }
516 }
517
518 pub(crate) fn disconnect_receivers(&self) -> bool {
522 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
523
524 if tail & MARK_BIT == 0 {
525 self.discard_all_messages();
528 true
529 } else {
530 false
531 }
532 }
533
534 fn discard_all_messages(&self) {
538 let backoff = Backoff::new();
539 let mut tail = self.tail.index.load(Ordering::Acquire);
540 loop {
541 let offset = (tail >> SHIFT) % LAP;
542 if offset != BLOCK_CAP {
543 break;
544 }
545
546 backoff.spin_heavy();
550 tail = self.tail.index.load(Ordering::Acquire);
551 }
552
553 let mut head = self.head.index.load(Ordering::Acquire);
554 let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
558
559 if head >> SHIFT != tail >> SHIFT {
561 while block.is_null() {
566 backoff.spin_heavy();
567 block = self.head.block.load(Ordering::Acquire);
568 }
569 }
570
571 unsafe {
572 while head >> SHIFT != tail >> SHIFT {
574 let offset = (head >> SHIFT) % LAP;
575
576 if offset < BLOCK_CAP {
577 let slot = (*block).slots.get_unchecked(offset);
579 slot.wait_write();
580 let p = &mut *slot.msg.get();
581 p.as_mut_ptr().drop_in_place();
582 } else {
583 (*block).wait_next();
584 let next = (*block).next.load(Ordering::Acquire);
586 drop(Box::from_raw(block));
587 block = next;
588 }
589
590 head = head.wrapping_add(1 << SHIFT);
591 }
592
593 if !block.is_null() {
595 drop(Box::from_raw(block));
596 }
597 }
598
599 head &= !MARK_BIT;
600 self.head.index.store(head, Ordering::Release);
601 }
602
603 pub(crate) fn is_disconnected(&self) -> bool {
605 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
606 }
607
608 pub(crate) fn is_empty(&self) -> bool {
610 let head = self.head.index.load(Ordering::SeqCst);
611 let tail = self.tail.index.load(Ordering::SeqCst);
612 head >> SHIFT == tail >> SHIFT
613 }
614
615 pub(crate) fn is_full(&self) -> bool {
617 false
618 }
619}
620
621impl<T> Drop for Channel<T> {
622 fn drop(&mut self) {
623 let mut head = self.head.index.load(Ordering::Relaxed);
624 let mut tail = self.tail.index.load(Ordering::Relaxed);
625 let mut block = self.head.block.load(Ordering::Relaxed);
626
627 head &= !((1 << SHIFT) - 1);
629 tail &= !((1 << SHIFT) - 1);
630
631 unsafe {
632 while head != tail {
634 let offset = (head >> SHIFT) % LAP;
635
636 if offset < BLOCK_CAP {
637 let slot = (*block).slots.get_unchecked(offset);
639 let p = &mut *slot.msg.get();
640 p.as_mut_ptr().drop_in_place();
641 } else {
642 let next = (*block).next.load(Ordering::Relaxed);
644 drop(Box::from_raw(block));
645 block = next;
646 }
647
648 head = head.wrapping_add(1 << SHIFT);
649 }
650
651 if !block.is_null() {
653 drop(Box::from_raw(block));
654 }
655 }
656 }
657}