all: use zstdframe where sensible (#11491)

Use the zstdframe package where sensible instead of plumbing
around our own zstd.Encoder just for stateless operations.

This causes logtail to have a dependency on zstd,
but that's arguably okay since zstd support is implicit
to the protocol between a client and the logging service.
Also, virtually every caller to logger.NewLogger was
manually setting up a zstd.Encoder anyways,
meaning that zstd was functionally always a dependency.

Updates #cleanup
Updates tailscale/corp#18514

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
irbekrm/maybe_fix_v6
Joe Tsai 8 months ago committed by GitHub
parent d4bfe34ba7
commit 85febda86d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -119,7 +119,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
github.com/klauspost/compress/huff0 from github.com/klauspost/compress/zstd github.com/klauspost/compress/huff0 from github.com/klauspost/compress/zstd
github.com/klauspost/compress/internal/cpuinfo from github.com/klauspost/compress/huff0+ github.com/klauspost/compress/internal/cpuinfo from github.com/klauspost/compress/huff0+
github.com/klauspost/compress/internal/snapref from github.com/klauspost/compress/zstd github.com/klauspost/compress/internal/snapref from github.com/klauspost/compress/zstd
github.com/klauspost/compress/zstd from tailscale.com/smallzstd github.com/klauspost/compress/zstd from tailscale.com/util/zstdframe
github.com/klauspost/compress/zstd/internal/xxhash from github.com/klauspost/compress/zstd github.com/klauspost/compress/zstd/internal/xxhash from github.com/klauspost/compress/zstd
github.com/kortschak/wol from tailscale.com/ipn/ipnlocal github.com/kortschak/wol from tailscale.com/ipn/ipnlocal
LD github.com/kr/fs from github.com/pkg/sftp LD github.com/kr/fs from github.com/pkg/sftp
@ -315,7 +315,6 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/posture from tailscale.com/ipn/ipnlocal tailscale.com/posture from tailscale.com/ipn/ipnlocal
tailscale.com/proxymap from tailscale.com/tsd+ tailscale.com/proxymap from tailscale.com/tsd+
💣 tailscale.com/safesocket from tailscale.com/client/tailscale+ 💣 tailscale.com/safesocket from tailscale.com/client/tailscale+
tailscale.com/smallzstd from tailscale.com/control/controlclient+
LD 💣 tailscale.com/ssh/tailssh from tailscale.com/cmd/tailscaled LD 💣 tailscale.com/ssh/tailssh from tailscale.com/cmd/tailscaled
tailscale.com/syncs from tailscale.com/cmd/tailscaled+ tailscale.com/syncs from tailscale.com/cmd/tailscaled+
tailscale.com/tailcfg from tailscale.com/client/tailscale+ tailscale.com/tailcfg from tailscale.com/client/tailscale+
@ -393,6 +392,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
💣 tailscale.com/util/winutil from tailscale.com/clientupdate+ 💣 tailscale.com/util/winutil from tailscale.com/clientupdate+
W 💣 tailscale.com/util/winutil/authenticode from tailscale.com/clientupdate+ W 💣 tailscale.com/util/winutil/authenticode from tailscale.com/clientupdate+
W tailscale.com/util/winutil/policy from tailscale.com/ipn/ipnlocal W tailscale.com/util/winutil/policy from tailscale.com/ipn/ipnlocal
tailscale.com/util/zstdframe from tailscale.com/control/controlclient+
tailscale.com/version from tailscale.com/client/web+ tailscale.com/version from tailscale.com/client/web+
tailscale.com/version/distro from tailscale.com/client/web+ tailscale.com/version/distro from tailscale.com/client/web+
W tailscale.com/wf from tailscale.com/cmd/tailscaled W tailscale.com/wf from tailscale.com/cmd/tailscaled

@ -42,7 +42,6 @@ import (
"tailscale.com/net/tlsdial" "tailscale.com/net/tlsdial"
"tailscale.com/net/tsdial" "tailscale.com/net/tsdial"
"tailscale.com/net/tshttpproxy" "tailscale.com/net/tshttpproxy"
"tailscale.com/smallzstd"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/tka" "tailscale.com/tka"
"tailscale.com/tstime" "tailscale.com/tstime"
@ -57,6 +56,7 @@ import (
"tailscale.com/util/singleflight" "tailscale.com/util/singleflight"
"tailscale.com/util/syspolicy" "tailscale.com/util/syspolicy"
"tailscale.com/util/systemd" "tailscale.com/util/systemd"
"tailscale.com/util/zstdframe"
) )
// Direct is the client that connects to a tailcontrol server for a node. // Direct is the client that connects to a tailcontrol server for a node.
@ -180,11 +180,6 @@ type Pinger interface {
Ping(ctx context.Context, ip netip.Addr, pingType tailcfg.PingType, size int) (*ipnstate.PingResult, error) Ping(ctx context.Context, ip netip.Addr, pingType tailcfg.PingType, size int) (*ipnstate.PingResult, error)
} }
type Decompressor interface {
DecodeAll(input, dst []byte) ([]byte, error)
Close()
}
// NetmapUpdater is the interface needed by the controlclient to enact change in // NetmapUpdater is the interface needed by the controlclient to enact change in
// the world as a function of updates received from the network. // the world as a function of updates received from the network.
type NetmapUpdater interface { type NetmapUpdater interface {
@ -1208,12 +1203,7 @@ func (c *Direct) decodeMsg(msg []byte, v any, mkey key.MachinePrivate) error {
} else { } else {
decrypted = msg decrypted = msg
} }
decoder, err := smallzstd.NewDecoder(nil) b, err := zstdframe.AppendDecode(nil, decrypted)
if err != nil {
return err
}
defer decoder.Close()
b, err := decoder.DecodeAll(decrypted, nil)
if err != nil { if err != nil {
return err return err
} }

@ -22,7 +22,6 @@ import (
"tailscale.com/logtail/filch" "tailscale.com/logtail/filch"
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/net/sockstats" "tailscale.com/net/sockstats"
"tailscale.com/smallzstd"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/logid" "tailscale.com/types/logid"
"tailscale.com/util/mak" "tailscale.com/util/mak"
@ -121,13 +120,7 @@ func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *ne
PrivateID: SockstatLogID(logID), PrivateID: SockstatLogID(logID),
Collection: "sockstats.log.tailscale.io", Collection: "sockstats.log.tailscale.io",
Buffer: filch, Buffer: filch,
NewZstdEncoder: func() logtail.Encoder { CompressLogs: true,
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
FlushDelayFn: func() time.Duration { FlushDelayFn: func() time.Duration {
// set flush delay to 100 years so it never flushes automatically // set flush delay to 100 years so it never flushes automatically
return 100 * 365 * 24 * time.Hour return 100 * 365 * 24 * time.Hour

@ -42,7 +42,6 @@ import (
"tailscale.com/net/tshttpproxy" "tailscale.com/net/tshttpproxy"
"tailscale.com/paths" "tailscale.com/paths"
"tailscale.com/safesocket" "tailscale.com/safesocket"
"tailscale.com/smallzstd"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/logid" "tailscale.com/types/logid"
"tailscale.com/util/clientmetric" "tailscale.com/util/clientmetric"
@ -554,13 +553,7 @@ func NewWithConfigPath(collection, dir, cmdName string, netMon *netmon.Monitor,
Collection: newc.Collection, Collection: newc.Collection,
PrivateID: newc.PrivateID, PrivateID: newc.PrivateID,
Stderr: logWriter{console}, Stderr: logWriter{console},
NewZstdEncoder: func() logtail.Encoder { CompressLogs: true,
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: &http.Client{Transport: NewLogtailTransport(logtail.DefaultHost, netMon, logf)}, HTTPC: &http.Client{Transport: NewLogtailTransport(logtail.DefaultHost, netMon, logf)},
} }
if collection == logtail.CollectionNode { if collection == logtail.CollectionNode {

@ -31,6 +31,7 @@ import (
tslogger "tailscale.com/types/logger" tslogger "tailscale.com/types/logger"
"tailscale.com/types/logid" "tailscale.com/types/logid"
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/zstdframe"
) )
// DefaultHost is the default host name to upload logs to when // DefaultHost is the default host name to upload logs to when
@ -62,6 +63,9 @@ type Config struct {
Stderr io.Writer // if set, logs are sent here instead of os.Stderr Stderr io.Writer // if set, logs are sent here instead of os.Stderr
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
Buffer Buffer // temp storage, if nil a MemoryBuffer Buffer Buffer // temp storage, if nil a MemoryBuffer
CompressLogs bool // whether to compress the log uploads
// Deprecated: Use CompressUploads instead.
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
// MetricsDelta, if non-nil, is a func that returns an encoding // MetricsDelta, if non-nil, is a func that returns an encoding
@ -156,6 +160,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
shutdownDone: make(chan struct{}), shutdownDone: make(chan struct{}),
} }
l.SetSockstatsLabel(sockstats.LabelLogtailLogger) l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
l.compressLogs = cfg.CompressLogs
if cfg.NewZstdEncoder != nil { if cfg.NewZstdEncoder != nil {
l.zstdEncoder = cfg.NewZstdEncoder() l.zstdEncoder = cfg.NewZstdEncoder()
} }
@ -184,6 +189,7 @@ type Logger struct {
flushPending atomic.Bool flushPending atomic.Bool
sentinel chan int32 sentinel chan int32
clock tstime.Clock clock tstime.Clock
compressLogs bool
zstdEncoder Encoder zstdEncoder Encoder
uploadCancel func() uploadCancel func()
explainedRaw bool explainedRaw bool
@ -364,8 +370,18 @@ func (l *Logger) uploading(ctx context.Context) {
body := l.drainPending(scratch) body := l.drainPending(scratch)
origlen := -1 // sentinel value: uncompressed origlen := -1 // sentinel value: uncompressed
// Don't attempt to compress tiny bodies; not worth the CPU cycles. // Don't attempt to compress tiny bodies; not worth the CPU cycles.
if l.zstdEncoder != nil && len(body) > 256 { if (l.compressLogs || l.zstdEncoder != nil) && len(body) > 256 {
zbody := l.zstdEncoder.EncodeAll(body, nil) var zbody []byte
switch {
case l.zstdEncoder != nil:
zbody = l.zstdEncoder.EncodeAll(body, nil)
case l.lowMem:
zbody = zstdframe.AppendEncode(nil, body,
zstdframe.FastestCompression, zstdframe.LowMemory(true))
default:
zbody = zstdframe.AppendEncode(nil, body)
}
// Only send it compressed if the bandwidth savings are sufficient. // Only send it compressed if the bandwidth savings are sufficient.
// Just the extra headers associated with enabling compression // Just the extra headers associated with enabling compression
// are 50 bytes by themselves. // are 50 bytes by themselves.

@ -46,7 +46,6 @@ import (
"tailscale.com/net/proxymux" "tailscale.com/net/proxymux"
"tailscale.com/net/socks5" "tailscale.com/net/socks5"
"tailscale.com/net/tsdial" "tailscale.com/net/tsdial"
"tailscale.com/smallzstd"
"tailscale.com/tsd" "tailscale.com/tsd"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/logid" "tailscale.com/types/logid"
@ -655,13 +654,7 @@ func (s *Server) startLogger(closePool *closeOnErrorPool) error {
PrivateID: lpc.PrivateID, PrivateID: lpc.PrivateID,
Stderr: io.Discard, // log everything to Buffer Stderr: io.Discard, // log everything to Buffer
Buffer: s.logbuffer, Buffer: s.logbuffer,
NewZstdEncoder: func() logtail.Encoder { CompressLogs: true,
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost, s.netMon, s.logf)}, HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost, s.netMon, s.logf)},
MetricsDelta: clientmetric.EncodeLogTailMetricsDelta, MetricsDelta: clientmetric.EncodeLogTailMetricsDelta,
} }

@ -27,17 +27,16 @@ import (
"testing" "testing"
"time" "time"
"github.com/klauspost/compress/zstd"
"go4.org/mem" "go4.org/mem"
"tailscale.com/derp" "tailscale.com/derp"
"tailscale.com/derp/derphttp" "tailscale.com/derp/derphttp"
"tailscale.com/net/stun/stuntest" "tailscale.com/net/stun/stuntest"
"tailscale.com/smallzstd"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/types/logid" "tailscale.com/types/logid"
"tailscale.com/types/nettype" "tailscale.com/types/nettype"
"tailscale.com/util/zstdframe"
"tailscale.com/version" "tailscale.com/version"
) )
@ -302,20 +301,19 @@ func (lc *LogCatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("bad log ID: %q: %v", r.URL.Path, err) log.Printf("bad log ID: %q: %v", r.URL.Path, err)
} }
var body io.Reader = r.Body bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("http.Request.Body.Read: %v", err)
return
}
if r.Header.Get("Content-Encoding") == "zstd" { if r.Header.Get("Content-Encoding") == "zstd" {
var err error bodyBytes, err = zstdframe.AppendDecode(nil, bodyBytes)
var dec *zstd.Decoder
dec, err = smallzstd.NewDecoder(body)
if err != nil { if err != nil {
log.Printf("bad caught zstd: %v", err) log.Printf("zstdframe.AppendDecode: %v", err)
http.Error(w, err.Error(), 400) http.Error(w, err.Error(), 400)
return return
} }
defer dec.Close()
body = dec
} }
bodyBytes, _ := io.ReadAll(body)
type Entry struct { type Entry struct {
Logtail struct { Logtail struct {

@ -24,11 +24,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/klauspost/compress/zstd"
"go4.org/mem" "go4.org/mem"
"tailscale.com/net/netaddr" "tailscale.com/net/netaddr"
"tailscale.com/net/tsaddr" "tailscale.com/net/tsaddr"
"tailscale.com/smallzstd"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
@ -37,6 +35,7 @@ import (
"tailscale.com/util/must" "tailscale.com/util/must"
"tailscale.com/util/rands" "tailscale.com/util/rands"
"tailscale.com/util/set" "tailscale.com/util/set"
"tailscale.com/util/zstdframe"
) )
const msgLimit = 1 << 20 // encrypted message length limit const msgLimit = 1 << 20 // encrypted message length limit
@ -1047,16 +1046,6 @@ func (s *Server) decode(mkey key.MachinePublic, msg []byte, v any) error {
return json.Unmarshal(decrypted, v) return json.Unmarshal(decrypted, v)
} }
var zstdEncoderPool = &sync.Pool{
New: func() any {
encoder, err := smallzstd.NewEncoder(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
if err != nil {
panic(err)
}
return encoder
},
}
func (s *Server) encode(mkey key.MachinePublic, compress bool, v any) (b []byte, err error) { func (s *Server) encode(mkey key.MachinePublic, compress bool, v any) (b []byte, err error) {
var isBytes bool var isBytes bool
if b, isBytes = v.([]byte); !isBytes { if b, isBytes = v.([]byte); !isBytes {
@ -1066,10 +1055,7 @@ func (s *Server) encode(mkey key.MachinePublic, compress bool, v any) (b []byte,
} }
} }
if compress { if compress {
encoder := zstdEncoderPool.Get().(*zstd.Encoder) b = zstdframe.AppendEncode(nil, b, zstdframe.FastestCompression)
b = encoder.EncodeAll(b, nil)
encoder.Close()
zstdEncoderPool.Put(encoder)
} }
return s.privateKey().SealTo(mkey, b), nil return s.privateKey().SealTo(mkey, b), nil
} }

@ -22,7 +22,6 @@ import (
"tailscale.com/net/netmon" "tailscale.com/net/netmon"
"tailscale.com/net/sockstats" "tailscale.com/net/sockstats"
"tailscale.com/net/tsaddr" "tailscale.com/net/tsaddr"
"tailscale.com/smallzstd"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/logid" "tailscale.com/types/logid"
"tailscale.com/types/netlogtype" "tailscale.com/types/netlogtype"
@ -111,15 +110,9 @@ func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID lo
PrivateID: nodeLogID, PrivateID: nodeLogID,
CopyPrivateID: domainLogID, CopyPrivateID: domainLogID,
Stderr: io.Discard, Stderr: io.Discard,
// TODO(joetsai): Set Buffer? Use an in-memory buffer for now. CompressLogs: true,
NewZstdEncoder: func() logtail.Encoder {
w, err := smallzstd.NewEncoder(nil)
if err != nil {
panic(err)
}
return w
},
HTTPC: httpc, HTTPC: httpc,
// TODO(joetsai): Set Buffer? Use an in-memory buffer for now.
// Include process sequence numbers to identify missing samples. // Include process sequence numbers to identify missing samples.
IncludeProcID: true, IncludeProcID: true,

Loading…
Cancel
Save