tsdns: remove forwarding queue.

Two levels of queueing are unnecessary.
The resulting implementation performs as follows
under request bursts (`count` packets sent concurrently):

lost  count            avg latency
   0 /  256 (00.00%) - 28ms
   0 /  512 (00.00%) - 146ms
   0 /  768 (00.00%) - 166ms
   0 / 1024 (00.00%) - 416ms
  11 / 1280 (00.86%) - 430ms
 145 / 1536 (09.44%) - 715ms
 364 / 2048 (17.77%) - 836ms

Signed-off-by: Dmytro Shynkevych <dmytro@tailscale.com>
reviewable/pr720/r1
Dmytro Shynkevych 4 years ago
parent 34a7e7c12b
commit 7541982635
No known key found for this signature in database
GPG Key ID: FF5E2F3DAD97EA23

@ -22,11 +22,6 @@ import (
// headerBytes is the number of bytes in a DNS message header.
const headerBytes = 12
// forwardQueueSize is the maximal number of requests that can be pending delegation.
// Note that this is distinct from the number of requests that are pending a response,
// which is not limited (except by txid collisions).
const forwardQueueSize = 64
// connCount is the number of UDP connections to use for forwarding.
const connCount = 32
@ -138,7 +133,6 @@ func newForwarder(logf logger.Logf, responses chan Packet) *forwarder {
return &forwarder{
logf: logger.WithPrefix(logf, "forward: "),
responses: responses,
queue: make(chan forwardedPacket, forwardQueueSize),
closed: make(chan struct{}),
conns: make([]*net.UDPConn, connCount),
txMap: make(map[txid]forwardingRecord),
@ -155,11 +149,10 @@ func (f *forwarder) Start() error {
}
}
f.wg.Add(connCount + 2)
f.wg.Add(connCount + 1)
for idx, conn := range f.conns {
go f.recv(uint16(idx), conn)
}
go f.send()
go f.cleanMap()
return nil
@ -191,28 +184,13 @@ func (f *forwarder) setUpstreams(upstreams []net.Addr) {
f.mu.Unlock()
}
func (f *forwarder) send() {
defer f.wg.Done()
var packet forwardedPacket
for {
select {
case <-f.closed:
return
case packet = <-f.queue:
// continue
}
connIdx := rand.Intn(connCount)
conn := f.conns[connIdx]
_, err := conn.WriteTo(packet.payload, packet.dst)
if err != nil {
// Do not log errors due to expired deadline.
if !errors.Is(err, os.ErrDeadlineExceeded) {
f.logf("send: %v", err)
}
return
}
func (f *forwarder) send(packet []byte, dst net.Addr) {
connIdx := rand.Intn(connCount)
conn := f.conns[connIdx]
_, err := conn.WriteTo(packet, dst)
// Do not log errors due to expired deadline.
if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
f.logf("send: %v", err)
}
}
@ -308,17 +286,8 @@ func (f *forwarder) forward(query Packet) error {
f.mu.Unlock()
packet := forwardedPacket{
payload: query.Payload,
}
for _, upstream := range upstreams {
packet.dst = upstream
select {
case <-f.closed:
return ErrClosed
case f.queue <- packet:
// continue
}
f.send(query.Payload, upstream)
}
return nil

@ -22,10 +22,10 @@ import (
// maxResponseBytes is the maximum size of a response from a Resolver.
const maxResponseBytes = 512
// pendingQueueSize is the maximal number of DNS requests that can await polling.
// queueSize is the maximal number of DNS requests that can await polling.
// If EnqueueRequest is called when this many requests are already pending,
// the request will be dropped to avoid blocking the caller.
const pendingQueueSize = 64
const queueSize = 64
// defaultTTL is the TTL of all responses from Resolver.
const defaultTTL = 600 * time.Second
@ -94,7 +94,7 @@ type ResolverConfig struct {
func NewResolver(config ResolverConfig) *Resolver {
r := &Resolver{
logf: logger.WithPrefix(config.Logf, "tsdns: "),
queue: make(chan Packet, pendingQueueSize),
queue: make(chan Packet, queueSize),
responses: make(chan Packet),
errors: make(chan error),
closed: make(chan struct{}),

Loading…
Cancel
Save