[SOLVED] How to address deadlock in producer and consumer code

Issue

This Content is from Stack Overflow. Question asked by drdot

When I ran the program below, I got an error

davecheney      tweets about golang
beertocode      does not tweet about golang
ironzeb         tweets about golang
beertocode      tweets about golang
vampirewalk666  tweets about golang
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000010260?)
        /usr/local/go/src/runtime/sema.go:56 +0x25
sync.(*WaitGroup).Wait(0x100c000058058?)
        /usr/local/go/src/sync/waitgroup.go:136 +0x52
main.main()
        /home/joe/go/src/github.com/go-concurrency-exercises/1-producer-consumer/main.go:53 +0x14f

Where is the deadlock coming from and how to improve the program to avoid that?

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(stream Stream, tweetChan chan *Tweet) {
    for {
        tweet, err := stream.Next()
        if err == ErrEOF {
            close(tweetChan)
            return
        }
        tweetChan <- tweet
        //tweets = append(tweets, tweet)
    }
}

func consumer(tweetChan chan *Tweet) {
    for t := range tweetChan {
        if t.IsTalkingAboutGo() {
            fmt.Println(t.Username, "ttweets about golang")
        } else {
            fmt.Println(t.Username, "tdoes not tweet about golang")
        }
    }
}

func main() {
    start := time.Now()
    stream := GetMockStream()

    var wg sync.WaitGroup
    tweetChan := make(chan *Tweet)
    // Producer
    //tweets := producer(stream)
    wg.Add(2)
    go producer(stream, tweetChan)
    // Consumer
    //consumer(tweets)
    go consumer(tweetChan)

    wg.Wait()

    fmt.Printf("Process took %sn", time.Since(start))
}

If you need to see mockstream.go, refer to
https://github.com/loong/go-concurrency-exercises/tree/master/1-producer-consumer

My program is a concurrent version of the original program by modifying main.go



Solution

The call to wg.Wait() is waiting until the group’s counter is zero, but there are no running goroutines to decrement the counter.

Fix by calling wg.Done() before returning from the goroutine functions:

func producer(wg *sync.WaitGroup, stream Stream, tweetChan chan *Tweet) {
    defer wg.Done()
    for {
        tweet, err := stream.Next()
        if err == ErrEOF {
            close(tweetChan)
            return
        }
        tweetChan <- tweet
    }
}

func consumer(wg *sync.WaitGroup, tweetChan chan *Tweet) {
    defer wg.Done()
    for t := range tweetChan {
        if t.IsTalkingAboutGo() {
            fmt.Println(t.Username, "\ttweets about golang")
        } else {
            fmt.Println(t.Username, "\tdoes not tweet about golang")
        }
    }
}

func main() {
    start := time.Now()
    stream := GetMockStream()
    var wg sync.WaitGroup
    tweetChan := make(chan *Tweet)
    wg.Add(2)
    go producer(&wg, stream, tweetChan)
    go consumer(&wg, tweetChan)
    wg.Wait()
    fmt.Printf("Process took %s\n", time.Since(start))
}


This Question was asked in StackOverflow by drdot and Answered by Cerise Limón It is licensed under the terms of CC BY-SA 2.5. - CC BY-SA 3.0. - CC BY-SA 4.0.

people found this article helpful. What about you?