supervisor_node.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package supervisor
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strings"
  7. "github.com/cenkalti/backoff/v4"
  8. "go.uber.org/zap"
  9. )
  10. // node is a supervision tree node. It represents the state of a Runnable within this tree, its relation to other tree
  11. // elements, and contains supporting data needed to actually supervise it.
  12. type node struct {
  13. // The name of this node. Opaque string. It's used to make up the 'dn' (distinguished name) of a node within
  14. // the tree. When starting a runnable inside a tree, this is where that name gets used.
  15. name string
  16. runnable Runnable
  17. // The supervisor managing this tree.
  18. sup *supervisor
  19. // The parent, within the tree, of this node. If this is the root node of the tree, this is nil.
  20. parent *node
  21. // Children of this tree. This is represented by a map keyed from child node names, for easy access.
  22. children map[string]*node
  23. // Supervision groups. Each group is a set of names of children. Sets, and as such groups, don't overlap between
  24. // each other. A supervision group indicates that if any child within that group fails, all others should be
  25. // canceled and restarted together.
  26. groups []map[string]bool
  27. // The current state of the runnable in this node.
  28. state nodeState
  29. // Backoff used to keep runnables from being restarted too fast.
  30. bo *backoff.ExponentialBackOff
  31. // Context passed to the runnable, and its cancel function.
  32. ctx context.Context
  33. ctxC context.CancelFunc
  34. }
  35. // nodeState is the state of a runnable within a node, and in a way the node itself.
  36. // This follows the state diagram from go/supervision.
  37. type nodeState int
  38. const (
  39. // A node that has just been created, and whose runnable has been started already but hasn't signaled anything yet.
  40. nodeStateNew nodeState = iota
  41. // A node whose runnable has signaled being healthy - this means it's ready to serve/act.
  42. nodeStateHealthy
  43. // A node that has unexpectedly returned or panicked.
  44. nodeStateDead
  45. // A node that has declared that its done with its work and should not be restarted, unless a supervision tree
  46. // failure requires that.
  47. nodeStateDone
  48. // A node that has returned after being requested to cancel.
  49. nodeStateCanceled
  50. )
  51. func (s nodeState) String() string {
  52. switch s {
  53. case nodeStateNew:
  54. return "NODE_STATE_NEW"
  55. case nodeStateHealthy:
  56. return "NODE_STATE_HEALTHY"
  57. case nodeStateDead:
  58. return "NODE_STATE_DEAD"
  59. case nodeStateDone:
  60. return "NODE_STATE_DONE"
  61. case nodeStateCanceled:
  62. return "NODE_STATE_CANCELED"
  63. }
  64. return "UNKNOWN"
  65. }
  66. func (n *node) String() string {
  67. return fmt.Sprintf("%s (%s)", n.dn(), n.state.String())
  68. }
  69. // contextKey is a type used to keep data within context values.
  70. type contextKey string
  71. var (
  72. supervisorKey = contextKey("supervisor")
  73. dnKey = contextKey("dn")
  74. )
  75. // fromContext retrieves a tree node from a runnable context. It takes a lock on the tree and returns an unlock
  76. // function. This unlock function needs to be called once mutations on the tree/supervisor/node are done.
  77. func fromContext(ctx context.Context) (*node, func()) {
  78. sup, ok := ctx.Value(supervisorKey).(*supervisor)
  79. if !ok {
  80. panic("supervisor function called from non-runnable context")
  81. }
  82. sup.mu.Lock()
  83. dnParent, ok := ctx.Value(dnKey).(string)
  84. if !ok {
  85. sup.mu.Unlock()
  86. panic("supervisor function called from non-runnable context")
  87. }
  88. return sup.nodeByDN(dnParent), sup.mu.Unlock
  89. }
  90. // All the following 'internal' supervisor functions must only be called with the supervisor lock taken. Getting a lock
  91. // via fromContext is enough.
  92. // dn returns the distinguished name of a node. This distinguished name is a period-separated, inverse-DNS-like name.
  93. // For instance, the runnable 'foo' within the runnable 'bar' will be called 'root.bar.foo'. The root of the tree is
  94. // always named, and has the dn, 'root'.
  95. func (n *node) dn() string {
  96. if n.parent != nil {
  97. return fmt.Sprintf("%s.%s", n.parent.dn(), n.name)
  98. }
  99. return n.name
  100. }
  101. // groupSiblings is a helper function to get all runnable group siblings of a given runnable name within this node.
  102. // All children are always in a group, even if that group is unary.
  103. func (n *node) groupSiblings(name string) map[string]bool {
  104. for _, m := range n.groups {
  105. if _, ok := m[name]; ok {
  106. return m
  107. }
  108. }
  109. return nil
  110. }
  111. // newNode creates a new node with a given parent. It does not register it with the parent (as that depends on group
  112. // placement).
  113. func newNode(name string, runnable Runnable, sup *supervisor, parent *node) *node {
  114. // We use exponential backoff for failed runnables, but at some point we cap at a given backoff time.
  115. // To achieve this, we set MaxElapsedTime to 0, which will cap the backoff at MaxInterval.
  116. bo := backoff.NewExponentialBackOff()
  117. bo.MaxElapsedTime = 0
  118. n := &node{
  119. name: name,
  120. runnable: runnable,
  121. bo: bo,
  122. sup: sup,
  123. parent: parent,
  124. }
  125. n.reset()
  126. return n
  127. }
  128. // resetNode sets up all the dynamic fields of the node, in preparation of starting a runnable. It clears the node's
  129. // children, groups and resets its context.
  130. func (n *node) reset() {
  131. // Make new context. First, acquire parent context. For the root node that's Background, otherwise it's the
  132. // parent's context.
  133. var pCtx context.Context
  134. if n.parent == nil {
  135. pCtx = context.Background()
  136. } else {
  137. pCtx = n.parent.ctx
  138. }
  139. // Mark DN and supervisor in context.
  140. ctx := context.WithValue(pCtx, dnKey, n.dn())
  141. ctx = context.WithValue(ctx, supervisorKey, n.sup)
  142. ctx, ctxC := context.WithCancel(ctx)
  143. // Set context
  144. n.ctx = ctx
  145. n.ctxC = ctxC
  146. // Clear children and state
  147. n.state = nodeStateNew
  148. n.children = make(map[string]*node)
  149. n.groups = nil
  150. // The node is now ready to be scheduled.
  151. }
  152. // nodeByDN returns a node by given DN from the supervisor.
  153. func (s *supervisor) nodeByDN(dn string) *node {
  154. parts := strings.Split(dn, ".")
  155. if parts[0] != "root" {
  156. panic("DN does not start with root.")
  157. }
  158. parts = parts[1:]
  159. cur := s.root
  160. for {
  161. if len(parts) == 0 {
  162. return cur
  163. }
  164. next, ok := cur.children[parts[0]]
  165. if !ok {
  166. panic(fmt.Errorf("could not find %v (%s) in %s", parts, dn, cur))
  167. }
  168. cur = next
  169. parts = parts[1:]
  170. }
  171. }
  172. // reNodeName validates a node name against constraints.
  173. var reNodeName = regexp.MustCompile(`[a-z90-9_]{1,64}`)
  174. // runGroup schedules a new group of runnables to run on a node.
  175. func (n *node) runGroup(runnables map[string]Runnable) error {
  176. // Check that the parent node is in the right state.
  177. if n.state != nodeStateNew {
  178. return fmt.Errorf("cannot run new runnable on non-NEW node")
  179. }
  180. // Check the requested runnable names.
  181. for name, _ := range runnables {
  182. if !reNodeName.MatchString(name) {
  183. return fmt.Errorf("runnable name %q is invalid", name)
  184. }
  185. if _, ok := n.children[name]; ok {
  186. return fmt.Errorf("runnable %q already exists", name)
  187. }
  188. }
  189. // Create child nodes.
  190. dns := make(map[string]string)
  191. group := make(map[string]bool)
  192. for name, runnable := range runnables {
  193. if g := n.groupSiblings(name); g != nil {
  194. return fmt.Errorf("duplicate child name %q", name)
  195. }
  196. node := newNode(name, runnable, n.sup, n)
  197. n.children[name] = node
  198. dns[name] = node.dn()
  199. group[name] = true
  200. }
  201. // Add group.
  202. n.groups = append(n.groups, group)
  203. // Schedule execution of group members.
  204. go func() {
  205. for name, _ := range runnables {
  206. n.sup.pReq <- &processorRequest{
  207. schedule: &processorRequestSchedule{
  208. dn: dns[name],
  209. },
  210. }
  211. }
  212. }()
  213. return nil
  214. }
  215. // signal sequences state changes by signals received from runnables and updates a node's status accordingly.
  216. func (n *node) signal(signal SignalType) {
  217. switch signal {
  218. case SignalHealthy:
  219. if n.state != nodeStateNew {
  220. panic(fmt.Errorf("node %s signaled healthy", n))
  221. }
  222. n.state = nodeStateHealthy
  223. n.bo.Reset()
  224. case SignalDone:
  225. if n.state != nodeStateHealthy {
  226. panic(fmt.Errorf("node %s signaled done", n))
  227. }
  228. n.state = nodeStateDone
  229. n.bo.Reset()
  230. }
  231. }
  232. // getLogger creates a new logger for a given supervisor node, to be used by its runnable.
  233. func (n *node) getLogger() *zap.Logger {
  234. return n.sup.logger.Named(n.dn())
  235. }