diff --git a/fixed_fifo_queue.go b/fixed_fifo_queue.go index 3fef4c5..b1f6033 100644 --- a/fixed_fifo_queue.go +++ b/fixed_fifo_queue.go @@ -1,6 +1,8 @@ package goconcurrentqueue -import "context" +import ( + "context" +) // Fixed capacity FIFO (First In First Out) concurrent queue type FixedFIFO struct { @@ -35,11 +37,11 @@ func (st *FixedFIFO) Enqueue(value interface{}) error { // verify whether it is possible to notify the listener (it could be the listener is no longer // available because the context expired: DequeueOrWaitForNextElementContext) select { - // sends the element through the listener's channel instead of enqueueing it - case listener <- value: - default: - // push the element into the queue instead of sending it through the listener's channel (which is not available at this moment) - return st.enqueueIntoQueue(value) + // sends the element through the listener's channel instead of enqueueing it + case listener <- value: + default: + // push the element into the queue instead of sending it through the listener's channel (which is not available at this moment) + return st.enqueueIntoQueue(value) } default: @@ -114,6 +116,12 @@ func (st *FixedFIFO) DequeueOrWaitForNextElementContext(ctx context.Context) (in return item, nil case <-ctx.Done(): return nil, ctx.Err() + // try again to get the element from the regular queue (in case waitChan doesn't provide any item) + case value, ok := <-st.queue: + if ok { + return value, nil + } + return nil, NewQueueError(QueueErrorCodeInternalChannelClosed, "internal channel is closed") } default: // too many watchers (waitForNextElementChanCapacity) enqueued waiting for next elements diff --git a/fixed_fifo_queue_test.go b/fixed_fifo_queue_test.go index 9b44606..15a1ca7 100644 --- a/fixed_fifo_queue_test.go +++ b/fixed_fifo_queue_test.go @@ -81,7 +81,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueFullCapacitySingleGR() { func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() { var ( uselessChan = make(chan interface{}) - value = "my-test-value" + value = "my-test-value" ) // let Enqueue knows there is a channel to send the next item instead of enqueueing it into the queue @@ -98,6 +98,7 @@ func (suite *FixedFIFOTestSuite) TestEnqueueListenerToExpireSingleGR() { // TestEnqueueLenMultipleGR enqueues elements concurrently // // Detailed steps: +// // 1 - Enqueue totalGRs concurrently (from totalGRs different GRs) // 2 - Verifies the len, it should be equal to totalGRs // 3 - Verifies that all elements from 0 to totalGRs were enqueued @@ -269,6 +270,7 @@ func (suite *FixedFIFOTestSuite) TestDequeueClosedChannelSingleGR() { // TestDequeueMultipleGRs dequeues elements concurrently // // Detailed steps: +// // 1 - Enqueues totalElementsToEnqueue consecutive integers // 2 - Dequeues totalElementsToDequeue concurrently from totalElementsToDequeue GRs // 3 - Verifies the final len, should be equal to totalElementsToEnqueue - totalElementsToDequeue @@ -376,6 +378,39 @@ func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithEmptyQueue() } } +// calling DequeueOrWaitForNextElement with empty queue, then adding an item directly into queue's internal channel +func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithStuckWaitChan() { + var ( + dummyValue = "dummyValue" + doneChan = make(chan struct{}) + ) + + // consumer + go func(queue *FixedFIFO, expectedValue interface{}, done chan struct{}) { + item, err := queue.DequeueOrWaitForNextElement() + suite.NoError(err) + suite.Equal(expectedValue, item) + + done <- struct{}{} + }(suite.fifo, dummyValue, doneChan) + + // a second should be enough for the consumer to start consuming ... + time.Sleep(time.Second) + + // add an item (enqueue) directly into queue's internal channel + suite.fifo.queue <- dummyValue + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + select { + case <-doneChan: + + case <-ctx.Done(): + suite.Fail("too much time waiting ...") + } +} + // single GR calling DequeueOrWaitForNextElement (WaitForNextElementChanCapacity + 1) times, last one should return error func (suite *FixedFIFOTestSuite) TestDequeueOrWaitForNextElementWithFullWaitingChannel() { // enqueue WaitForNextElementChanCapacity listeners to future enqueued elements @@ -554,4 +589,4 @@ func (suite *FixedFIFOTestSuite) TestContextAlreadyCanceled() { case <-time.After(2 * time.Second): suite.Fail("DequeueOrWaitForNextElementContext did not return immediately after context was canceled") } -} \ No newline at end of file +} diff --git a/readme.md b/readme.md index 0ca91b6..8e8b77d 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.6.3-yellowgreen.svg?style=flat "goconcurrentqueue v0.6.3") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) +[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white)](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) [![godoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://godoc.org/github.com/enriquebris/goconcurrentqueue) ![version](https://img.shields.io/badge/version-v0.7.0-yellowgreen.svg?style=flat "goconcurrentqueue v0.7.0") [![Build Status](https://api.travis-ci.org/enriquebris/goconcurrentqueue.svg?branch=master)](https://travis-ci.org/enriquebris/goconcurrentqueue) [![Go Report Card](https://goreportcard.com/badge/github.com/enriquebris/goconcurrentqueue)](https://goreportcard.com/report/github.com/enriquebris/goconcurrentqueue) [![codecov](https://codecov.io/gh/enriquebris/goconcurrentqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/enriquebris/goconcurrentqueue) [![CodeFactor](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue/badge)](https://www.codefactor.io/repository/github/enriquebris/goconcurrentqueue) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) # goconcurrentqueue - Concurrent safe queues The package goconcurrentqueue offers a public interface Queue with methods for a [queue](https://en.wikipedia.org/wiki/Queue_(abstract_data_type)). @@ -22,18 +22,7 @@ Execute go get github.com/enriquebris/goconcurrentqueue ``` -This package is compatible with the following golang versions: - - 1.7.x - - 1.8.x - - 1.9.x - - 1.10.x - - 1.11.x - - 1.12.x - - 1.13.x - - 1.14.x - - 1.15.x - - 1.16.x - - 1.17.x +This package is compatible with all golang versions >= 1.7.x ## Documentation Visit [goconcurrentqueue at go.dev](https://pkg.go.dev/mod/github.com/enriquebris/goconcurrentqueue) @@ -245,6 +234,10 @@ func workWithQueue(queue goconcurrentqueue.Queue) error { ## History +### v0.7.0 + +- Prevents FIFO.DequeueOrWaitForNextElement to keep waiting for a waitChan while internal queues contain items + ### v0.6.3 - Prevents FIFO.DequeueOrWaitForNextElement to add useless wait channels