Vraag Hoe sync.Cond correct te gebruiken?


Ik heb problemen met het uitzoeken hoe correct te gebruiken sync.Cond. Van wat ik kan vertellen, bestaat er een raceconditie tussen het vergrendelen van de Locker en het aanroepen van de Wait-methode van de voorwaarde. Dit voorbeeld voegt een kunstmatige vertraging toe tussen de twee lijnen in de hoofdgoroutine om de raceconditie te simuleren:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[Ren op de Go Playground]

Dit veroorzaakt onmiddellijk paniek:

fatale fout: alle goroutines slapen - impasse!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire (0x10330208, 0x1)
    /usr/local/go/src/runtime/sema.go:241 + 0x2e0
sync. (* Cond) .Wachten (0x10330200, 0x0)
    /usr/local/go/src/sync/cond.go:63 + 0xe0
main.main ()
    /tmp/sandbox301865429/main.go:17 + 0x1a0

Wat doe ik verkeerd? Hoe vermijd ik deze schijnbare raceconditie? Is er een beter synchronisatieconstruct dat ik zou moeten gebruiken?


Bewerk: Ik realiseer me dat ik het probleem dat ik hier probeer op te lossen beter heb uitgelegd. Ik heb een langlopende goroutine die een groot bestand en een aantal andere goroutines downloadt die toegang tot de HTTP-headers nodig hebben wanneer ze beschikbaar zijn. Dit probleem is moeilijker dan het klinkt.

Ik kan geen kanalen gebruiken omdat slechts één goroutine de waarde zou krijgen. En sommige van de andere goroutines proberen de headers op te halen lang nadat ze al beschikbaar zijn.

De goroutine van de downloader kan eenvoudig de HTTP-headers in een variabele opslaan en een mutex gebruiken om de toegang tot deze variabelen te beveiligen. Dit biedt echter geen manier voor de andere goroutines om te "wachten" totdat ze beschikbaar zijn.

Ik had gedacht dat beide sync.Mutex en sync.Cond samen konden dit doel bereiken maar het lijkt erop dat dit niet mogelijk is.


11
2018-04-26 06:43


oorsprong


antwoorden:


OP beantwoordde de zijne, maar beantwoordde niet direct de oorspronkelijke vraag, ik ga posten hoe correct te gebruiken sync.Cond.

Je hebt het niet echt nodig sync.Cond als je één goroutine hebt voor elke schrijf en lees - een single sync.Mutex zou voldoende zijn om met hen te communiceren. sync.Cond kan nuttig zijn in situaties waarin meerdere lezers wachten totdat de gedeelde bronnen beschikbaar zijn.

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

Speelplaats

Dat gezegd hebbende, is het gebruik van kanalen nog steeds de aanbevolen manier om gegevens door te geven als de situatie dit toelaat.

Notitie: sync.WaitGroup hier wordt alleen gewacht tot de goroutines klaar zijn met hun executies.


6
2018-03-13 20:25



U moet ervoor zorgen dat c.Broadcast wordt aangeroepen na uw oproep tot c.Wait. De juiste versie van uw programma zou zijn:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m
}

https://play.golang.org/p/O1r8v8yW6h


4
2018-05-17 03:56



package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock() // main gouroutine is owner of lock
    c := sync.NewCond(&m)
    go func() {
        m.Lock() // obtain a lock
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
        c.Broadcast()               // State has been changed, publish it to waiting goroutines
        fmt.Println("4. goroutine will release lock soon (deffered Unlock")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second) // initialization
    fmt.Println("2. main goroutine is still lockek")
    c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
    // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
    m.Unlock()
    fmt.Println("Done")
}

http://play.golang.org/p/fBBwoL7_pm


2
2018-04-26 08:14



Het lijkt erop dat je c. Wacht op uitzending, wat nooit zou gebeuren met je tijdsintervallen. Met

time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()

je fragment lijkt te werken http://play.golang.org/p/OE8aP4i6gY . Of mis ik iets dat je probeert te bereiken?


2
2018-04-26 10:27



Ik heb eindelijk een manier ontdekt om dit te doen en het gaat niet om sync.Cond helemaal - alleen de mutex.

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

Hoe werkt dit?

De mutex is aan het begin van de taak vergrendeld en zorgt ervoor dat alles wat belt WaitFor()zal blokkeren. Zodra de headers beschikbaar zijn en de mutex wordt ontgrendeld door de goroutine, moet elke oproep naar WaitFor() wordt één voor één uitgevoerd. Alle toekomstige oproepen (zelfs nadat de goroutine is afgelopen) hebben geen probleem om de mutex te vergrendelen, omdat deze altijd ontgrendeld blijft.


1
2018-05-06 04:37



Hier is een praktisch voorbeeld met twee routines. Ze beginnen de een na de ander, maar de tweede wacht op een voorwaarde die door de eerste wordt uitgezonden voordat hij doorgaat:

package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    lock := sync.Mutex{}
    lock.Lock()

    cond := sync.NewCond(&lock)

    waitGroup := sync.WaitGroup{}
    waitGroup.Add(2)

    go func() {
        defer waitGroup.Done()

        fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")

        time.Sleep(1 * time.Second)

        fmt.Println("First go routine broadcasts condition")

        cond.Broadcast()
    }()

    go func() {
        defer waitGroup.Done()

        fmt.Println("Second go routine has started and is waiting on condition")

        cond.Wait()

        fmt.Println("Second go routine unlocked by condition broadcast")
    }()

    fmt.Println("Main go routine starts waiting")

    waitGroup.Wait()

    fmt.Println("Main go routine ends")
}

De uitvoer kan enigszins variëren omdat de tweede-startroutine kan beginnen vóór de eerste en omgekeerd:

Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends

https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70


1
2017-08-03 14:16