sync.mx raw

   1  package runtime
   2  
   3  // Semaphore implementation for moxie's cooperative scheduler.
   4  // Used by internal/poll and internal/sync (which backs sync.Mutex).
   5  
   6  import (
   7  	"internal/task"
   8  	"sync/atomic"
   9  )
  10  
  11  // Waiter queue for semaphores. Maps sema address → linked list of waiting tasks.
  12  type semaWaiter struct {
  13  	t    *task.Task
  14  	addr *uint32
  15  	next *semaWaiter
  16  }
  17  
  18  var (
  19  	semaWaiters   *semaWaiter
  20  	semaPool      [256]semaWaiter
  21  	semaPoolCount int
  22  )
  23  
  24  func semaAlloc() *semaWaiter {
  25  	if semaPoolCount >= len(semaPool) {
  26  		runtimePanic("semaphore: too many waiters")
  27  	}
  28  	w := &semaPool[semaPoolCount]
  29  	semaPoolCount++
  30  	return w
  31  }
  32  
  33  // semacquire1 blocks until *addr > 0, then decrements it.
  34  func semacquire1(addr *uint32, lifo bool) {
  35  	for {
  36  		v := atomic.LoadUint32(addr)
  37  		if v > 0 {
  38  			if atomic.CompareAndSwapUint32(addr, v, v-1) {
  39  				return
  40  			}
  41  			continue
  42  		}
  43  		if !hasScheduler {
  44  			// Single-threaded: spin on event loop until released.
  45  			if hasPoll {
  46  				netpollBlock(1)
  47  			} else {
  48  				sleepTicks(nanosecondsToTicks(1_000_000))
  49  			}
  50  			fireTimers()
  51  			continue
  52  		}
  53  		// Park: add to wait queue and pause.
  54  		w := semaAlloc()
  55  		w.t = task.Current()
  56  		w.addr = addr
  57  		if lifo {
  58  			// Insert at front.
  59  			w.next = semaWaiters
  60  			semaWaiters = w
  61  		} else {
  62  			// Insert at back.
  63  			w.next = nil
  64  			if semaWaiters == nil {
  65  				semaWaiters = w
  66  			} else {
  67  				tail := semaWaiters
  68  				for tail.next != nil {
  69  					tail = tail.next
  70  				}
  71  				tail.next = w
  72  			}
  73  		}
  74  		task.Pause()
  75  		// Woken up — retry the acquire.
  76  	}
  77  }
  78  
  79  // semrelease1 increments *addr and wakes one waiting goroutine if any.
  80  func semrelease1(addr *uint32) {
  81  	atomic.AddUint32(addr, 1)
  82  
  83  	// Find and wake a waiter for this address.
  84  	var prev *semaWaiter
  85  	for w := semaWaiters; w != nil; w = w.next {
  86  		if w.addr == addr {
  87  			// Remove from list.
  88  			if prev == nil {
  89  				semaWaiters = w.next
  90  			} else {
  91  				prev.next = w.next
  92  			}
  93  			t := w.t
  94  			w.t = nil
  95  			w.addr = nil
  96  			w.next = nil
  97  			scheduleTask(t)
  98  			return
  99  		}
 100  		prev = w
 101  	}
 102  }
 103  
 104  // internal/poll semaphores (simple interface).
 105  
 106  //go:linkname poll_semacquire internal/poll.runtime_Semacquire
 107  func poll_semacquire(sema *uint32) {
 108  	semacquire1(sema, false)
 109  }
 110  
 111  //go:linkname poll_semrelease internal/poll.runtime_Semrelease
 112  func poll_semrelease(sema *uint32) {
 113  	semrelease1(sema)
 114  }
 115  
 116  // internal/sync semaphores (mutex interface with lifo/handoff).
 117  
 118  //go:linkname sync_runtime_SemacquireMutex internal/sync.runtime_SemacquireMutex
 119  func sync_runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) {
 120  	semacquire1(s, lifo)
 121  }
 122  
 123  //go:linkname sync_runtime_Semrelease internal/sync.runtime_Semrelease
 124  func sync_runtime_Semrelease(s *uint32, handoff bool, skipframes int) {
 125  	semrelease1(s)
 126  }
 127