supervisor_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. package supervisor
  2. import (
  3. "context"
  4. "fmt"
  5. "testing"
  6. "time"
  7. "go.uber.org/zap"
  8. )
  9. func runnableBecomesHealthy(healthy, done chan struct{}) Runnable {
  10. return func(ctx context.Context) error {
  11. Signal(ctx, SignalHealthy)
  12. go func() {
  13. if healthy != nil {
  14. healthy <- struct{}{}
  15. }
  16. }()
  17. <-ctx.Done()
  18. go func() {
  19. if done != nil {
  20. done <- struct{}{}
  21. }
  22. }()
  23. return ctx.Err()
  24. }
  25. }
  26. func runnableSpawnsMore(healthy, done chan struct{}, levels int) Runnable {
  27. return func(ctx context.Context) error {
  28. if levels > 0 {
  29. err := RunGroup(ctx, map[string]Runnable{
  30. "a": runnableSpawnsMore(nil, nil, levels-1),
  31. "b": runnableSpawnsMore(nil, nil, levels-1),
  32. })
  33. if err != nil {
  34. return err
  35. }
  36. }
  37. Signal(ctx, SignalHealthy)
  38. go func() {
  39. if healthy != nil {
  40. healthy <- struct{}{}
  41. }
  42. }()
  43. <-ctx.Done()
  44. go func() {
  45. if done != nil {
  46. done <- struct{}{}
  47. }
  48. }()
  49. return ctx.Err()
  50. }
  51. }
  52. // rc is a Remote Controlled runnable. It is a generic runnable used for testing the supervisor.
  53. type rc struct {
  54. req chan rcRunnableRequest
  55. }
  56. type rcRunnableRequest struct {
  57. cmd rcRunnableCommand
  58. stateC chan rcRunnableState
  59. }
  60. type rcRunnableCommand int
  61. const (
  62. rcRunnableCommandBecomeHealthy rcRunnableCommand = iota
  63. rcRunnableCommandBecomeDone
  64. rcRunnableCommandDie
  65. rcRunnableCommandPanic
  66. rcRunnableCommandState
  67. )
  68. type rcRunnableState int
  69. const (
  70. rcRunnableStateNew rcRunnableState = iota
  71. rcRunnableStateHealthy
  72. rcRunnableStateDone
  73. )
  74. func (r *rc) becomeHealthy() {
  75. r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeHealthy}
  76. }
  77. func (r *rc) becomeDone() {
  78. r.req <- rcRunnableRequest{cmd: rcRunnableCommandBecomeDone}
  79. }
  80. func (r *rc) die() {
  81. r.req <- rcRunnableRequest{cmd: rcRunnableCommandDie}
  82. }
  83. func (r *rc) panic() {
  84. r.req <- rcRunnableRequest{cmd: rcRunnableCommandPanic}
  85. }
  86. func (r *rc) state() rcRunnableState {
  87. c := make(chan rcRunnableState)
  88. r.req <- rcRunnableRequest{
  89. cmd: rcRunnableCommandState,
  90. stateC: c,
  91. }
  92. return <-c
  93. }
  94. func (r *rc) waitState(s rcRunnableState) {
  95. // This is poll based. Making it non-poll based would make the RC runnable logic a bit more complex for little gain.
  96. for {
  97. got := r.state()
  98. if got == s {
  99. return
  100. }
  101. time.Sleep(10 * time.Millisecond)
  102. }
  103. }
  104. func newRC() *rc {
  105. return &rc{
  106. req: make(chan rcRunnableRequest),
  107. }
  108. }
  109. // Remote Controlled Runnable
  110. func (r *rc) runnable() Runnable {
  111. return func(ctx context.Context) error {
  112. state := rcRunnableStateNew
  113. for {
  114. select {
  115. case <-ctx.Done():
  116. return ctx.Err()
  117. case r := <-r.req:
  118. switch r.cmd {
  119. case rcRunnableCommandBecomeHealthy:
  120. Signal(ctx, SignalHealthy)
  121. state = rcRunnableStateHealthy
  122. case rcRunnableCommandBecomeDone:
  123. Signal(ctx, SignalDone)
  124. state = rcRunnableStateDone
  125. case rcRunnableCommandDie:
  126. return fmt.Errorf("died on request")
  127. case rcRunnableCommandPanic:
  128. panic("at the disco")
  129. case rcRunnableCommandState:
  130. r.stateC <- state
  131. }
  132. }
  133. }
  134. }
  135. }
  136. func TestSimple(t *testing.T) {
  137. h1 := make(chan struct{})
  138. d1 := make(chan struct{})
  139. h2 := make(chan struct{})
  140. d2 := make(chan struct{})
  141. log, _ := zap.NewDevelopment()
  142. ctx, ctxC := context.WithCancel(context.Background())
  143. defer ctxC()
  144. s := New(ctx, log, func(ctx context.Context) error {
  145. err := RunGroup(ctx, map[string]Runnable{
  146. "one": runnableBecomesHealthy(h1, d1),
  147. "two": runnableBecomesHealthy(h2, d2),
  148. })
  149. if err != nil {
  150. return err
  151. }
  152. Signal(ctx, SignalHealthy)
  153. Signal(ctx, SignalDone)
  154. return nil
  155. }, WithPropagatePanic)
  156. // Expect both to start running.
  157. s.waitSettleError(ctx, t)
  158. select {
  159. case <-h1:
  160. default:
  161. t.Fatalf("runnable 'one' didn't start")
  162. }
  163. select {
  164. case <-h2:
  165. default:
  166. t.Fatalf("runnable 'one' didn't start")
  167. }
  168. }
  169. func TestSimpleFailure(t *testing.T) {
  170. h1 := make(chan struct{})
  171. d1 := make(chan struct{})
  172. two := newRC()
  173. log, _ := zap.NewDevelopment()
  174. ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
  175. defer ctxC()
  176. s := New(ctx, log, func(ctx context.Context) error {
  177. err := RunGroup(ctx, map[string]Runnable{
  178. "one": runnableBecomesHealthy(h1, d1),
  179. "two": two.runnable(),
  180. })
  181. if err != nil {
  182. return err
  183. }
  184. Signal(ctx, SignalHealthy)
  185. Signal(ctx, SignalDone)
  186. return nil
  187. }, WithPropagatePanic)
  188. s.waitSettleError(ctx, t)
  189. two.becomeHealthy()
  190. s.waitSettleError(ctx, t)
  191. // Expect one to start running.
  192. select {
  193. case <-h1:
  194. default:
  195. t.Fatalf("runnable 'one' didn't start")
  196. }
  197. // Kill off two, one should restart.
  198. two.die()
  199. s.waitSettleError(ctx, t)
  200. select {
  201. case <-d1:
  202. default:
  203. t.Fatalf("runnable 'one' didn't acknowledge cancel")
  204. }
  205. // And one should start running again.
  206. s.waitSettleError(ctx, t)
  207. select {
  208. case <-h1:
  209. default:
  210. t.Fatalf("runnable 'one' didn't restart")
  211. }
  212. }
  213. func TestDeepFailure(t *testing.T) {
  214. h1 := make(chan struct{})
  215. d1 := make(chan struct{})
  216. two := newRC()
  217. log, _ := zap.NewDevelopment()
  218. ctx, ctxC := context.WithTimeout(context.Background(), 10*time.Second)
  219. defer ctxC()
  220. s := New(ctx, log, func(ctx context.Context) error {
  221. err := RunGroup(ctx, map[string]Runnable{
  222. "one": runnableSpawnsMore(h1, d1, 5),
  223. "two": two.runnable(),
  224. })
  225. if err != nil {
  226. return err
  227. }
  228. Signal(ctx, SignalHealthy)
  229. Signal(ctx, SignalDone)
  230. return nil
  231. }, WithPropagatePanic)
  232. two.becomeHealthy()
  233. s.waitSettleError(ctx, t)
  234. // Expect one to start running.
  235. select {
  236. case <-h1:
  237. default:
  238. t.Fatalf("runnable 'one' didn't start")
  239. }
  240. // Kill off two, one should restart.
  241. two.die()
  242. s.waitSettleError(ctx, t)
  243. select {
  244. case <-d1:
  245. default:
  246. t.Fatalf("runnable 'one' didn't acknowledge cancel")
  247. }
  248. // And one should start running again.
  249. s.waitSettleError(ctx, t)
  250. select {
  251. case <-h1:
  252. default:
  253. t.Fatalf("runnable 'one' didn't restart")
  254. }
  255. }
  256. func TestPanic(t *testing.T) {
  257. h1 := make(chan struct{})
  258. d1 := make(chan struct{})
  259. two := newRC()
  260. log, _ := zap.NewDevelopment()
  261. ctx, ctxC := context.WithCancel(context.Background())
  262. defer ctxC()
  263. s := New(ctx, log, func(ctx context.Context) error {
  264. err := RunGroup(ctx, map[string]Runnable{
  265. "one": runnableBecomesHealthy(h1, d1),
  266. "two": two.runnable(),
  267. })
  268. if err != nil {
  269. return err
  270. }
  271. Signal(ctx, SignalHealthy)
  272. Signal(ctx, SignalDone)
  273. return nil
  274. })
  275. two.becomeHealthy()
  276. s.waitSettleError(ctx, t)
  277. // Expect one to start running.
  278. select {
  279. case <-h1:
  280. default:
  281. t.Fatalf("runnable 'one' didn't start")
  282. }
  283. // Kill off two, one should restart.
  284. two.panic()
  285. s.waitSettleError(ctx, t)
  286. select {
  287. case <-d1:
  288. default:
  289. t.Fatalf("runnable 'one' didn't acknowledge cancel")
  290. }
  291. // And one should start running again.
  292. s.waitSettleError(ctx, t)
  293. select {
  294. case <-h1:
  295. default:
  296. t.Fatalf("runnable 'one' didn't restart")
  297. }
  298. }
  299. func TestMultipleLevelFailure(t *testing.T) {
  300. log, _ := zap.NewDevelopment()
  301. ctx, ctxC := context.WithCancel(context.Background())
  302. defer ctxC()
  303. New(ctx, log, func(ctx context.Context) error {
  304. err := RunGroup(ctx, map[string]Runnable{
  305. "one": runnableSpawnsMore(nil, nil, 4),
  306. "two": runnableSpawnsMore(nil, nil, 4),
  307. })
  308. if err != nil {
  309. return err
  310. }
  311. Signal(ctx, SignalHealthy)
  312. Signal(ctx, SignalDone)
  313. return nil
  314. }, WithPropagatePanic)
  315. }
  316. func TestBackoff(t *testing.T) {
  317. one := newRC()
  318. log, _ := zap.NewDevelopment()
  319. ctx, ctxC := context.WithTimeout(context.Background(), 20*time.Second)
  320. defer ctxC()
  321. s := New(ctx, log, func(ctx context.Context) error {
  322. if err := Run(ctx, "one", one.runnable()); err != nil {
  323. return err
  324. }
  325. Signal(ctx, SignalHealthy)
  326. Signal(ctx, SignalDone)
  327. return nil
  328. }, WithPropagatePanic)
  329. one.becomeHealthy()
  330. // Die a bunch of times in a row, this brings up the next exponential backoff to over a second.
  331. for i := 0; i < 4; i += 1 {
  332. one.die()
  333. one.waitState(rcRunnableStateNew)
  334. }
  335. // Measure how long it takes for the runnable to respawn after a number of failures
  336. start := time.Now()
  337. one.die()
  338. one.becomeHealthy()
  339. one.waitState(rcRunnableStateHealthy)
  340. taken := time.Since(start)
  341. if taken < 1*time.Second {
  342. t.Errorf("Runnable took %v to restart, wanted at least a second from backoff", taken)
  343. }
  344. s.waitSettleError(ctx, t)
  345. // Now that we've become healthy, die again. Becoming healthy resets the backoff.
  346. start = time.Now()
  347. one.die()
  348. one.becomeHealthy()
  349. one.waitState(rcRunnableStateHealthy)
  350. taken = time.Since(start)
  351. if taken > 1*time.Second || taken < 100*time.Millisecond {
  352. t.Errorf("Runnable took %v to restart, wanted at least 100ms from backoff and at most 1s from backoff reset", taken)
  353. }
  354. }
  355. // TestResilience throws some curveballs at the supervisor - either programming errors or high load. It then ensures
  356. // that another runnable is running, and that it restarts on its sibling failure.
  357. func TestResilience(t *testing.T) {
  358. // request/response channel for testing liveness of the 'one' runnable
  359. req := make(chan chan struct{})
  360. // A runnable that responds on the 'req' channel.
  361. one := func(ctx context.Context) error {
  362. Signal(ctx, SignalHealthy)
  363. for {
  364. select {
  365. case <-ctx.Done():
  366. return ctx.Err()
  367. case r := <-req:
  368. r <- struct{}{}
  369. }
  370. }
  371. }
  372. oneSibling := newRC()
  373. oneTest := func() {
  374. timeout := time.NewTicker(1000 * time.Millisecond)
  375. ping := make(chan struct{})
  376. req <- ping
  377. select {
  378. case <-ping:
  379. case <-timeout.C:
  380. t.Fatalf("one ping response timeout")
  381. }
  382. timeout.Stop()
  383. }
  384. // A nasty runnable that calls Signal with the wrong context (this is a programming error)
  385. two := func(ctx context.Context) error {
  386. Signal(context.TODO(), SignalHealthy)
  387. return nil
  388. }
  389. // A nasty runnable that calls Signal wrong (this is a programming error).
  390. three := func(ctx context.Context) error {
  391. Signal(ctx, SignalDone)
  392. return nil
  393. }
  394. // A nasty runnable that runs in a busy loop (this is a programming error).
  395. four := func(ctx context.Context) error {
  396. for {
  397. time.Sleep(0)
  398. }
  399. }
  400. // A nasty runnable that keeps creating more runnables.
  401. five := func(ctx context.Context) error {
  402. i := 1
  403. for {
  404. err := Run(ctx, fmt.Sprintf("r%d", i), runnableSpawnsMore(nil, nil, 2))
  405. if err != nil {
  406. return err
  407. }
  408. time.Sleep(100 * time.Millisecond)
  409. i += 1
  410. }
  411. }
  412. log, _ := zap.NewDevelopment()
  413. ctx, ctxC := context.WithCancel(context.Background())
  414. defer ctxC()
  415. New(ctx, log, func(ctx context.Context) error {
  416. RunGroup(ctx, map[string]Runnable{
  417. "one": one,
  418. "oneSibling": oneSibling.runnable(),
  419. })
  420. rs := map[string]Runnable{
  421. "two": two, "three": three, "four": four, "five": five,
  422. }
  423. for k, v := range rs {
  424. if err := Run(ctx, k, v); err != nil {
  425. return err
  426. }
  427. }
  428. Signal(ctx, SignalHealthy)
  429. Signal(ctx, SignalDone)
  430. return nil
  431. })
  432. // Five rounds of letting one run, then restarting it.
  433. for i := 0; i < 5; i += 1 {
  434. oneSibling.becomeHealthy()
  435. oneSibling.waitState(rcRunnableStateHealthy)
  436. // 'one' should work for at least a second.
  437. deadline := time.Now().Add(1 * time.Second)
  438. for {
  439. if time.Now().After(deadline) {
  440. break
  441. }
  442. oneTest()
  443. }
  444. // Killing 'oneSibling' should restart one.
  445. oneSibling.panic()
  446. }
  447. // Make sure 'one' is still okay.
  448. oneTest()
  449. }
  450. func ExampleNew() {
  451. // Minimal runnable that is immediately done.
  452. childC := make(chan struct{})
  453. child := func(ctx context.Context) error {
  454. Signal(ctx, SignalHealthy)
  455. close(childC)
  456. Signal(ctx, SignalDone)
  457. return nil
  458. }
  459. log, _ := zap.NewDevelopment()
  460. // Start a supervision tree with a root runnable.
  461. ctx, ctxC := context.WithCancel(context.Background())
  462. defer ctxC()
  463. New(ctx, log, func(ctx context.Context) error {
  464. err := Run(ctx, "child", child)
  465. if err != nil {
  466. return fmt.Errorf("could not run 'child': %w", err)
  467. }
  468. Signal(ctx, SignalHealthy)
  469. t := time.NewTicker(time.Second)
  470. defer t.Stop()
  471. // Do something in the background, and exit on context cancel.
  472. for {
  473. select {
  474. case <-t.C:
  475. fmt.Printf("tick!")
  476. case <-ctx.Done():
  477. return ctx.Err()
  478. }
  479. }
  480. })
  481. // root.child will close this channel.
  482. <-childC
  483. }