portlist: unexport all Poller fields, removing unused one, rework channels

Poller.C and Poller.c were duplicated for one caller. Add an accessor
returning the receive-only version instead. It'll inline.

Poller.Err was unused. Remove.

Then Poller is opaque.

The channel usage and shutdown was a bit sketchy. Clean it up.

And document some things.

Change-Id: I5669e54f51a6a13492cf5485c83133bda7ea3ce9
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/6021/head
Brad Fitzpatrick 2 years ago committed by Brad Fitzpatrick
parent 46ce80758d
commit def089f9c9

@ -1530,7 +1530,7 @@ func dnsMapsEqual(new, old *netmap.NetworkMap) bool {
func (b *LocalBackend) readPoller() {
n := 0
for {
ports, ok := <-b.portpoll.C
ports, ok := <-b.portpoll.Updates()
if !ok {
return
}

@ -15,21 +15,17 @@ import (
// Poller scans the systems for listening ports periodically and sends
// the results to C.
type Poller struct {
// C received the list of ports periodically. It's closed when
// Run completes, after which Err can be checked.
C <-chan List
c chan List // unbuffered
// Err is the error from the final GetList call. It is only
// valid to read once C has been closed. Err is nil if Close
// is called or the context is canceled.
Err error
// closeCtx is the context that's canceled on Close.
closeCtx context.Context
closeCtxCancel context.CancelFunc
runDone chan struct{} // closed when Run completes
// scatch is memory for Poller.getList to reuse between calls.
scratch []Port
c chan List // the unconstrained version of the exported C above
quitCh chan struct{} // close this to force exit
prev List // most recent data
}
@ -41,9 +37,9 @@ func NewPoller() (*Poller, error) {
}
p := &Poller{
c: make(chan List),
quitCh: make(chan struct{}),
runDone: make(chan struct{}),
}
p.C = p.c
p.closeCtx, p.closeCtxCancel = context.WithCancel(context.Background())
// Do one initial poll synchronously so we can return an error
// early.
@ -55,49 +51,64 @@ func NewPoller() (*Poller, error) {
return p, nil
}
// Updates return the channel that receives port list updates.
//
// The channel is closed when the Poller is closed.
func (p *Poller) Updates() <-chan List { return p.c }
// Close closes the Poller.
// Run will return with a nil error.
func (p *Poller) Close() error {
select {
case <-p.quitCh:
p.closeCtxCancel()
<-p.runDone
return nil
default:
}
// send sends pl to p.c and returns whether it was successfully sent.
func (p *Poller) send(ctx context.Context, pl List) (sent bool, err error) {
select {
case p.c <- pl:
return true, nil
case <-ctx.Done():
return false, ctx.Err()
case <-p.closeCtx.Done():
return false, nil
}
close(p.quitCh)
<-p.C
return nil
}
// Run runs the Poller periodically until either the context
// is done, or the Close is called.
//
// Run may only be called once.
func (p *Poller) Run(ctx context.Context) error {
defer close(p.runDone)
defer close(p.c)
tick := time.NewTicker(pollInterval)
defer tick.Stop()
// Send out the pre-generated initial value
p.c <- p.prev
// Send out the pre-generated initial value.
if sent, err := p.send(ctx, p.prev); !sent {
return err
}
for {
select {
case <-tick.C:
pl, err := p.getList()
if err != nil {
p.Err = err
return err
}
if pl.sameInodes(p.prev) {
continue
}
p.prev = pl
select {
case p.c <- pl:
case <-ctx.Done():
return ctx.Err()
case <-p.quitCh:
return nil
if sent, err := p.send(ctx, p.prev); !sent {
return err
}
case <-ctx.Done():
return ctx.Err()
case <-p.quitCh:
case <-p.closeCtx.Done():
return nil
}
}

Loading…
Cancel
Save