channel_utils_test.go 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package common
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. )
  9. const myDelay = time.Millisecond * 100
  10. const myMaxSize = 2
  11. const myQueueSize = myMaxSize * 10
  12. func TestReadFromChannelWithTimeout_NoData(t *testing.T) {
  13. ctx := context.Background()
  14. myChan := make(chan int, myQueueSize)
  15. // No data should timeout.
  16. timeout, cancel := context.WithTimeout(ctx, myDelay)
  17. defer cancel()
  18. observations, err := ReadFromChannelWithTimeout[int](timeout, myChan, myMaxSize)
  19. assert.Equal(t, err, context.DeadlineExceeded)
  20. assert.Equal(t, 0, len(observations))
  21. }
  22. func TestReadFromChannelWithTimeout_SomeData(t *testing.T) {
  23. ctx := context.Background()
  24. myChan := make(chan int, myQueueSize)
  25. myChan <- 1
  26. // Some data but not enough to fill a message should timeout and return the data.
  27. timeout, cancel := context.WithTimeout(ctx, myDelay)
  28. defer cancel()
  29. observations, err := ReadFromChannelWithTimeout[int](timeout, myChan, myMaxSize)
  30. assert.Equal(t, err, context.DeadlineExceeded)
  31. require.Equal(t, 1, len(observations))
  32. assert.Equal(t, 1, observations[0])
  33. }
  34. func TestReadFromChannelWithTimeout_JustEnoughData(t *testing.T) {
  35. ctx := context.Background()
  36. myChan := make(chan int, myQueueSize)
  37. myChan <- 1
  38. myChan <- 2
  39. // Just enough data should return the data and no error.
  40. timeout, cancel := context.WithTimeout(ctx, myDelay)
  41. defer cancel()
  42. observations, err := ReadFromChannelWithTimeout[int](timeout, myChan, myMaxSize)
  43. assert.NoError(t, err)
  44. require.Equal(t, 2, len(observations))
  45. assert.Equal(t, 1, observations[0])
  46. assert.Equal(t, 2, observations[1])
  47. }
  48. func TestReadFromChannelWithTimeout_TooMuchData(t *testing.T) {
  49. ctx := context.Background()
  50. myChan := make(chan int, myQueueSize)
  51. myChan <- 1
  52. myChan <- 2
  53. myChan <- 3
  54. // If there is more data than will fit, it should immediately return a full message, then timeout and return the remainder.
  55. timeout, cancel := context.WithTimeout(ctx, myDelay)
  56. defer cancel()
  57. observations, err := ReadFromChannelWithTimeout[int](timeout, myChan, myMaxSize)
  58. assert.NoError(t, err)
  59. require.Equal(t, 2, len(observations))
  60. assert.Equal(t, 1, observations[0])
  61. assert.Equal(t, 2, observations[1])
  62. timeout2, cancel2 := context.WithTimeout(ctx, myDelay)
  63. defer cancel2()
  64. observations, err = ReadFromChannelWithTimeout[int](timeout2, myChan, myMaxSize)
  65. assert.Equal(t, err, context.DeadlineExceeded)
  66. require.Equal(t, 1, len(observations))
  67. assert.Equal(t, 3, observations[0])
  68. }
  69. func TestWriteToChannelWithoutBlocking(t *testing.T) {
  70. myChan := make(chan int, 1)
  71. assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
  72. WriteToChannelWithoutBlocking(myChan, 42, "numbers")
  73. assert.Equal(t, 0.0, getCounterValue(channelWriteDrops, "numbers"))
  74. WriteToChannelWithoutBlocking(myChan, 43, "numbers")
  75. assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "numbers"))
  76. WriteToChannelWithoutBlocking(myChan, 44, "numbers")
  77. assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
  78. WriteToChannelWithoutBlocking(myChan, 44, "different_label")
  79. assert.Equal(t, 1.0, getCounterValue(channelWriteDrops, "different_label"))
  80. assert.Equal(t, 2.0, getCounterValue(channelWriteDrops, "numbers"))
  81. }