Compare commits

...

8 Commits

Author SHA1 Message Date
Aaron Klotz d72494bac7 VERSION.txt: this is v1.86.2
Signed-off-by: Aaron Klotz <aaron@tailscale.com>
6 months ago
Tom Meadows a277abcae8
k8s-operator: adding session type to cast header (#16660) (#16689)
Updates #16490

Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
6 months ago
Jordan Whited 50a476fbc4
wgengine/magicsock: fix magicsock deadlock around Conn.NoteRecvActivity (#16687) (#16696)
Updates #16651
Updates tailscale/corp#30836


(cherry picked from commit a9f3fd1c67)

Signed-off-by: Jordan Whited <jordan@tailscale.com>
6 months ago
M. J. Fromberger 9c7305074a net/portmapper: avert a panic when a mapping is not available (#16686)
Ideally when we attempt to create a new port mapping, we should not return
without error when no mapping is available. We already log these cases as
unexpected, so this change is just to avoiding panicking dispatch on the
invalid result in those cases. We still separately need to fix the underlying
control flow.

Updates #16662

Change-Id: I51e8a116b922b49eda45e31cd27f6b89dd51abc8

Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
(cherry picked from commit 5ce3845a02)
6 months ago
Nick Khyl 4123469edf util/syspolicy/setting: use a custom marshaler for time.Duration
jsonv2 now returns an error when you marshal or unmarshal a time.Duration
without an explicit format flag. This is an intentional, temporary choice until
the default [time.Duration] representation is decided (see golang/go#71631).

setting.Snapshot can hold time.Duration values inside a map[string]any,
so the jsonv2 update breaks marshaling. In this PR, we start using
a custom marshaler until that decision is made or golang/go#71664
lets us specify the format explicitly.

This fixes `tailscale syspolicy list` failing when KeyExpirationNotice
or any other time.Duration policy setting is configured.

Fixes #16683

Signed-off-by: Nick Khyl <nickk@tailscale.com>
(cherry picked from commit 4df02bbb48)
6 months ago
Tom Proctor 91d65e03e8
k8s-operator: handle multiple WebSocket frames per read (#16678) (#16679)
Cherry picks bug fix #16678 and flake fix #16680 onto the 1.86 release branch.

When kubectl starts an interactive attach session, it sends 2 resize
messages in quick succession. It seems that particularly in HTTP mode,
we often receive both of these WebSocket frames from the underlying
connection in a single read. However, our parser currently assumes 0-1
frames per read, and leaves the second frame in the read buffer until
the next read from the underlying connection. It doesn't take long after
that before we end up failing to skip a control message as we normally
should, and then we parse a control message as though it will have a
stream ID (part of the Kubernetes protocol) and error out.

Instead, we should keep parsing frames from the read buffer for as long
as we're able to parse complete frames, so this commit refactors the
messages parsing logic into a loop based on the contents of the read
buffer being non-empty.

k/k staging/src/k8s.io/kubectl/pkg/cmd/attach/attach.go for full
details of the resize messages.

There are at least a couple more multiple-frame read edge cases we
should handle, but this commit is very conservatively fixing a single
observed issue to make it a low-risk candidate for cherry picking.

Updates #13358

Change-Id: Iafb91ad1cbeed9c5231a1525d4563164fc1f002f

Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
6 months ago
Aaron Klotz fdcff402fb VERSION.txt: this is v1.86.1
Signed-off-by: Aaron Klotz <aaron@tailscale.com>
6 months ago
Aaron Klotz 758dfe7203 VERSION.txt: this is v1.86.0
Signed-off-by: Aaron Klotz <aaron@tailscale.com>
6 months ago

@ -1 +1 @@
1.85.0
1.86.2

@ -114,8 +114,9 @@ func (ap *APIServerProxy) Run(ctx context.Context) error {
mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/attach", ap.serveAttachWS)
ap.hs = &http.Server{
Handler: mux,
ErrorLog: zap.NewStdLog(ap.log.Desugar()),
Handler: mux,
ErrorLog: zap.NewStdLog(ap.log.Desugar()),
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
}
mode := "noauth"
@ -140,7 +141,6 @@ func (ap *APIServerProxy) Run(ctx context.Context) error {
GetCertificate: ap.lc.GetCertificate,
NextProtos: []string{"http/1.1"},
}
ap.hs.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler))
} else {
var err error
tsLn, err = ap.ts.Listen("tcp", ":80")

@ -184,9 +184,10 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
SrcNode: strings.TrimSuffix(h.who.Node.Name, "."),
SrcNodeID: h.who.Node.StableID,
Kubernetes: &sessionrecording.Kubernetes{
PodName: h.pod,
Namespace: h.ns,
Container: container,
PodName: h.pod,
Namespace: h.ns,
Container: container,
SessionType: string(h.sessionType),
},
}
if !h.who.Node.IsTagged() {
@ -236,7 +237,6 @@ func (h *Hijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn,
if err := lc.Close(); err != nil {
h.log.Infof("error closing recorder connections: %v", err)
}
return
}()
return lc, nil
}

@ -148,6 +148,8 @@ func (c *conn) Read(b []byte) (int, error) {
return 0, nil
}
// TODO(tomhjp): If we get multiple frames in a single Read with different
// types, we may parse the second frame with the wrong type.
typ := messageType(opcode(b))
if (typ == noOpcode && c.readMsgIsIncomplete()) || c.readBufHasIncompleteFragment() { // subsequent fragment
if typ, err = c.curReadMsgType(); err != nil {
@ -157,6 +159,8 @@ func (c *conn) Read(b []byte) (int, error) {
// A control message can not be fragmented and we are not interested in
// these messages. Just return.
// TODO(tomhjp): If we get multiple frames in a single Read, we may skip
// some non-control messages.
if isControlMessage(typ) {
return n, nil
}
@ -169,62 +173,65 @@ func (c *conn) Read(b []byte) (int, error) {
return n, nil
}
readMsg := &message{typ: typ} // start a new message...
// ... or pick up an already started one if the previous fragment was not final.
if c.readMsgIsIncomplete() || c.readBufHasIncompleteFragment() {
readMsg = c.currentReadMsg
}
if _, err := c.readBuf.Write(b[:n]); err != nil {
return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err)
}
ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log)
if err != nil {
return 0, fmt.Errorf("error parsing message: %v", err)
}
if !ok { // incomplete fragment
return n, nil
}
c.readBuf.Next(len(readMsg.raw))
if readMsg.isFinalized && !c.readMsgIsIncomplete() {
// we want to send stream resize messages for terminal sessions
// Stream IDs for websocket streams are static.
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm {
var msg tsrecorder.ResizeMsg
if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize message: %w", err)
}
for c.readBuf.Len() != 0 {
readMsg := &message{typ: typ} // start a new message...
// ... or pick up an already started one if the previous fragment was not final.
if c.readMsgIsIncomplete() {
readMsg = c.currentReadMsg
}
c.ch.Width = msg.Width
c.ch.Height = msg.Height
var isInitialResize bool
c.writeCastHeaderOnce.Do(func() {
isInitialResize = true
// If this is a session with a terminal attached,
// we must wait for the terminal width and
// height to be parsed from a resize message
// before sending CastHeader, else tsrecorder
// will not be able to play this recording.
err = c.rec.WriteCastHeader(c.ch)
close(c.initialCastHeaderSent)
})
if err != nil {
return 0, fmt.Errorf("error writing CastHeader: %w", err)
}
ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log)
if err != nil {
return 0, fmt.Errorf("error parsing message: %v", err)
}
if !ok { // incomplete fragment
return n, nil
}
c.readBuf.Next(len(readMsg.raw))
if readMsg.isFinalized && !c.readMsgIsIncomplete() {
// we want to send stream resize messages for terminal sessions
// Stream IDs for websocket streams are static.
// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
if readMsg.streamID.Load() == remotecommand.StreamResize && c.hasTerm {
var msg tsrecorder.ResizeMsg
if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
return 0, fmt.Errorf("error umarshalling resize message: %w", err)
}
c.ch.Width = msg.Width
c.ch.Height = msg.Height
var isInitialResize bool
c.writeCastHeaderOnce.Do(func() {
isInitialResize = true
// If this is a session with a terminal attached,
// we must wait for the terminal width and
// height to be parsed from a resize message
// before sending CastHeader, else tsrecorder
// will not be able to play this recording.
err = c.rec.WriteCastHeader(c.ch)
close(c.initialCastHeaderSent)
})
if err != nil {
return 0, fmt.Errorf("error writing CastHeader: %w", err)
}
if !isInitialResize {
if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil {
return 0, fmt.Errorf("error writing resize message: %w", err)
if !isInitialResize {
if err := c.rec.WriteResize(msg.Height, msg.Width); err != nil {
return 0, fmt.Errorf("error writing resize message: %w", err)
}
}
}
}
c.currentReadMsg = readMsg
}
c.currentReadMsg = readMsg
return n, nil
}

@ -9,6 +9,7 @@ import (
"context"
"fmt"
"reflect"
"runtime/debug"
"testing"
"time"
@ -58,15 +59,39 @@ func Test_conn_Read(t *testing.T) {
wantCastHeaderHeight: 20,
},
{
name: "two_reads_resize_message",
inputs: [][]byte{{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22}, {0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d}},
name: "resize_data_frame_two_in_one_read",
inputs: [][]byte{
fmt.Appendf(nil, "%s%s",
append([]byte{0x82, lenResizeMsgPayload}, testResizeMsg...),
append([]byte{0x82, lenResizeMsgPayload}, testResizeMsg...),
),
},
wantRecorded: append(fakes.AsciinemaCastHeaderMsg(t, 10, 20), fakes.AsciinemaCastResizeMsg(t, 10, 20)...),
wantCastHeaderWidth: 10,
wantCastHeaderHeight: 20,
},
{
name: "two_reads_resize_message",
inputs: [][]byte{
// op, len, stream ID, `{"width`
{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22},
// op, len, stream ID, `:10,"height":20}`
{0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d},
},
wantCastHeaderWidth: 10,
wantCastHeaderHeight: 20,
wantRecorded: fakes.AsciinemaCastHeaderMsg(t, 10, 20),
},
{
name: "three_reads_resize_message_with_split_fragment",
inputs: [][]byte{{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22}, {0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74}, {0x22, 0x3a, 0x32, 0x30, 0x7d}},
name: "three_reads_resize_message_with_split_fragment",
inputs: [][]byte{
// op, len, stream ID, `{"width"`
{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22},
// op, len, stream ID, `:10,"height`
{0x00, 0x0c, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74},
// op, len, stream ID, `":20}`
{0x80, 0x06, 0x4, 0x22, 0x3a, 0x32, 0x30, 0x7d},
},
wantCastHeaderWidth: 10,
wantCastHeaderHeight: 20,
wantRecorded: fakes.AsciinemaCastHeaderMsg(t, 10, 20),
@ -260,19 +285,28 @@ func Test_conn_WriteRand(t *testing.T) {
sr := &fakes.TestSessionRecorder{}
rec := tsrecorder.New(sr, cl, cl.Now(), true, zl.Sugar())
for i := range 100 {
tc := &fakes.TestConn{}
c := &conn{
Conn: tc,
log: zl.Sugar(),
rec: rec,
}
bb := fakes.RandomBytes(t)
for j, input := range bb {
f := func() {
c.Write(input)
t.Run(fmt.Sprintf("test_%d", i), func(t *testing.T) {
tc := &fakes.TestConn{}
c := &conn{
Conn: tc,
log: zl.Sugar(),
rec: rec,
ctx: context.Background(), // ctx must be non-nil.
initialCastHeaderSent: make(chan struct{}),
}
testPanic(t, f, fmt.Sprintf("[%d %d] Write: panic parsing input of length %d first bytes %b current write message %+#v", i, j, len(input), firstBytes(input), c.currentWriteMsg))
}
// Never block for random data.
c.writeCastHeaderOnce.Do(func() {
close(c.initialCastHeaderSent)
})
bb := fakes.RandomBytes(t)
for j, input := range bb {
f := func() {
c.Write(input)
}
testPanic(t, f, fmt.Sprintf("[%d %d] Write: panic parsing input of length %d first bytes %b current write message %+#v", i, j, len(input), firstBytes(input), c.currentWriteMsg))
}
})
}
}
@ -280,7 +314,7 @@ func testPanic(t *testing.T, f func(), msg string) {
t.Helper()
defer func() {
if r := recover(); r != nil {
t.Fatal(msg, r)
t.Fatal(msg, r, string(debug.Stack()))
}
}()
f()

@ -7,10 +7,10 @@ package ws
import (
"encoding/binary"
"errors"
"fmt"
"sync/atomic"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/net/websocket"
@ -139,6 +139,8 @@ func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) {
return false, errors.New("[unexpected] received a message fragment with no stream ID")
}
// Stream ID will be one of the constants from:
// https://github.com/kubernetes/kubernetes/blob/f9ed14bf9b1119a2e091f4b487a3b54930661034/staging/src/k8s.io/apimachinery/pkg/util/remotecommand/constants.go#L57-L64
streamID := uint32(msgPayload[0])
if !isInitialFragment && msg.streamID.Load() != streamID {
return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d", msg.streamID.Load(), streamID)

@ -507,6 +507,13 @@ func (c *Client) createMapping() {
c.logf("createOrGetMapping: %v", err)
}
return
} else if mapping == nil {
return
// TODO(creachadair): This was already logged in createOrGetMapping.
// It really should not happen at all, but we will need to untangle
// the control flow to eliminate that possibility. Meanwhile, this
// mitigates a panic downstream, cf. #16662.
}
if c.updates != nil {
c.updates.Publish(Mapping{

@ -167,7 +167,8 @@ type CapabilityVersion int
// - 120: 2025-07-15: Client understands peer relay disco messages, and implements peer client and relay server functions
// - 121: 2025-07-19: Client understands peer relay endpoint alloc with [disco.AllocateUDPRelayEndpointRequest] & [disco.AllocateUDPRelayEndpointResponse]
// - 122: 2025-07-21: Client sends Hostinfo.ExitNodeID to report which exit node it has selected, if any.
const CurrentCapabilityVersion CapabilityVersion = 122
// - 123: 2025-07-28: fix deadlock regression from cryptokey routing change (issue #16651)
const CurrentCapabilityVersion CapabilityVersion = 123
// ID is an integer ID for a user, node, or login allocated by the
// control plane.

@ -9,6 +9,7 @@ import (
"maps"
"slices"
"strings"
"time"
jsonv2 "github.com/go-json-experiment/json"
"github.com/go-json-experiment/json/jsontext"
@ -152,6 +153,24 @@ var (
_ jsonv2.UnmarshalerFrom = (*Snapshot)(nil)
)
// As of 2025-07-28, jsonv2 no longer has a default representation for [time.Duration],
// so we need to provide a custom marshaler.
//
// This is temporary until the decision on the default representation is made
// (see https://github.com/golang/go/issues/71631#issuecomment-2981670799).
//
// In the future, we might either use the default representation (if compatible with
// [time.Duration.String]) or specify something like json.WithFormat[time.Duration]("units")
// when golang/go#71664 is implemented.
//
// TODO(nickkhyl): revisit this when the decision on the default [time.Duration]
// representation is made in golang/go#71631 and/or golang/go#71664 is implemented.
var formatDurationAsUnits = jsonv2.JoinOptions(
jsonv2.WithMarshalers(jsonv2.MarshalToFunc(func(e *jsontext.Encoder, t time.Duration) error {
return e.WriteToken(jsontext.String(t.String()))
})),
)
// MarshalJSONTo implements [jsonv2.MarshalerTo].
func (s *Snapshot) MarshalJSONTo(out *jsontext.Encoder) error {
data := &snapshotJSON{}
@ -159,7 +178,7 @@ func (s *Snapshot) MarshalJSONTo(out *jsontext.Encoder) error {
data.Summary = s.summary
data.Settings = s.m
}
return jsonv2.MarshalEncode(out, data)
return jsonv2.MarshalEncode(out, data, formatDurationAsUnits)
}
// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom].

@ -491,6 +491,18 @@ func TestMarshalUnmarshalSnapshot(t *testing.T) {
snapshot: NewSnapshot(map[Key]RawItem{"ListPolicy": RawItemOf([]string{"Value1", "Value2"})}),
wantJSON: `{"Settings": {"ListPolicy": {"Value": ["Value1", "Value2"]}}}`,
},
{
name: "Duration/Zero",
snapshot: NewSnapshot(map[Key]RawItem{"DurationPolicy": RawItemOf(time.Duration(0))}),
wantJSON: `{"Settings": {"DurationPolicy": {"Value": "0s"}}}`,
wantBack: NewSnapshot(map[Key]RawItem{"DurationPolicy": RawItemOf("0s")}),
},
{
name: "Duration/NonZero",
snapshot: NewSnapshot(map[Key]RawItem{"DurationPolicy": RawItemOf(2 * time.Hour)}),
wantJSON: `{"Settings": {"DurationPolicy": {"Value": "2h0m0s"}}}`,
wantBack: NewSnapshot(map[Key]RawItem{"DurationPolicy": RawItemOf("2h0m0s")}),
},
{
name: "Empty/With-Summary",
snapshot: NewSnapshot(

@ -4119,8 +4119,11 @@ func (le *lazyEndpoint) InitiationMessagePublicKey(peerPublicKey [32]byte) {
return
}
le.c.mu.Lock()
defer le.c.mu.Unlock()
ep, ok := le.c.peerMap.endpointForNodeKey(pubKey)
// [Conn.mu] must not be held while [Conn.noteRecvActivity] is called, which
// [endpoint.noteRecvActivity] can end up calling. See
// [Options.NoteRecvActivity] docs.
le.c.mu.Unlock()
if !ok {
return
}

Loading…
Cancel
Save