diff --git a/cmd/netlogfmt/main.go b/cmd/netlogfmt/main.go index b89520b5c..0ee3e260d 100644 --- a/cmd/netlogfmt/main.go +++ b/cmd/netlogfmt/main.go @@ -4,13 +4,16 @@ // netlogfmt parses a stream of JSON log messages from stdin and // formats the network traffic logs produced by "tailscale.com/wgengine/netlog" +// according to the schema in "tailscale.com/types/netlogtype.Message" // in a more humanly readable format. // // Example usage: // -// $ cat netlog.json | netlogfmt +// $ cat netlog.json | go run tailscale.com/cmd/netlogfmt // ========================================================================================= -// Time: 2022-10-13T20:23:09.644Z (5s) +// NodeID: n123456CNTRL +// Logged: 2022-10-13T20:23:10.165Z +// Window: 2022-10-13T20:23:09.644Z (5s) // --------------------------------------------------- Tx[P/s] Tx[B/s] Rx[P/s] Rx[B/s] // VirtualTraffic: 16.80 1.64Ki 11.20 1.03Ki // TCP: 100.109.51.95:22 -> 100.85.80.41:42912 16.00 1.59Ki 10.40 1008.84 @@ -37,8 +40,11 @@ import ( "strings" "time" + "github.com/dsnet/try" + jsonv2 "github.com/go-json-experiment/json" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "tailscale.com/logtail" "tailscale.com/types/netlogtype" "tailscale.com/util/must" ) @@ -49,156 +55,232 @@ var ( tailnetName = flag.String("tailnet-name", "", "tailnet domain name to lookup devices in; see https://login.tailscale.com/admin/settings/general") ) +var namesByAddr map[netip.Addr]string + func main() { flag.Parse() + if *resolveNames { + namesByAddr = mustMakeNamesByAddr() + } - namesByAddr := mustMakeNamesByAddr() - dec := json.NewDecoder(os.Stdin) - for { - // Unmarshal the log message containing network traffics. - var msg struct { - Logtail struct { - ID string `json:"id"` - } `json:"logtail"` - netlogtype.Message + // The logic handles a stream of arbitrary JSON. + // So long as a JSON object seems like a network log message, + // then this will unmarshal and print it. + if err := processStream(os.Stdin); err != nil { + if err == io.EOF { + return } - if err := dec.Decode(&msg); err != nil { - if err == io.EOF { - break + log.Fatalf("processStream: %v", err) + } +} + +func processStream(r io.Reader) (err error) { + defer try.Handle(&err) + dec := jsonv2.NewDecoder(os.Stdin) + for { + processValue(dec) + } +} + +func processValue(dec *jsonv2.Decoder) { + switch dec.PeekKind() { + case '[': + processArray(dec) + case '{': + processObject(dec) + default: + try.E(dec.SkipValue()) + } +} + +func processArray(dec *jsonv2.Decoder) { + try.E1(dec.ReadToken()) // parse '[' + for dec.PeekKind() != ']' { + processValue(dec) + } + try.E1(dec.ReadToken()) // parse ']' +} + +func processObject(dec *jsonv2.Decoder) { + var hasTraffic bool + var rawMsg []byte + try.E1(dec.ReadToken()) // parse '{' + for dec.PeekKind() != '}' { + // Capture any members that could belong to a network log message. + switch name := try.E1(dec.ReadToken()); name.String() { + case "virtualTraffic", "subnetTraffic", "exitTraffic", "physicalTraffic": + hasTraffic = true + fallthrough + case "logtail", "nodeId", "logged", "start", "end": + if len(rawMsg) == 0 { + rawMsg = append(rawMsg, '{') + } else { + rawMsg = append(rawMsg[:len(rawMsg)-1], ',') } - log.Fatalf("UnmarshalNext: %v", err) - } - if len(msg.VirtualTraffic)+len(msg.SubnetTraffic)+len(msg.ExitTraffic)+len(msg.PhysicalTraffic) == 0 { - continue // nothing to print + rawMsg = append(append(append(rawMsg, '"'), name.String()...), '"') + rawMsg = append(rawMsg, ':') + rawMsg = append(rawMsg, try.E1(dec.ReadValue())...) + rawMsg = append(rawMsg, '}') + default: + processValue(dec) } + } + try.E1(dec.ReadToken()) // parse '}' - // Construct a table of network traffic per connection. - rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}} - duration := msg.End.Sub(msg.Start) - addRows := func(heading string, traffic []netlogtype.ConnectionCounts) { - if len(traffic) == 0 { - return - } - slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) bool { - nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes - ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes - return nx > ny - }) - var sum netlogtype.Counts - for _, cc := range traffic { - sum = sum.Add(cc.Counts) - } - rows = append(rows, [7]string{ - 0: heading + ":", - 3: formatSI(float64(sum.TxPackets) / duration.Seconds()), - 4: formatIEC(float64(sum.TxBytes) / duration.Seconds()), - 5: formatSI(float64(sum.RxPackets) / duration.Seconds()), - 6: formatIEC(float64(sum.RxBytes) / duration.Seconds()), - }) - if len(traffic) == 1 && traffic[0].Connection.IsZero() { - return // this is already a summary counts + // If this appears to be a network log message, then unmarshal and print it. + if hasTraffic { + var msg message + try.E(jsonv2.Unmarshal(rawMsg, &msg)) + printMessage(msg) + } +} + +type message struct { + Logtail struct { + ID logtail.PublicID `json:"id"` + Logged time.Time `json:"server_time"` + } `json:"logtail"` + Logged time.Time `json:"logged"` + netlogtype.Message +} + +func printMessage(msg message) { + // Construct a table of network traffic per connection. + rows := [][7]string{{3: "Tx[P/s]", 4: "Tx[B/s]", 5: "Rx[P/s]", 6: "Rx[B/s]"}} + duration := msg.End.Sub(msg.Start) + addRows := func(heading string, traffic []netlogtype.ConnectionCounts) { + if len(traffic) == 0 { + return + } + slices.SortFunc(traffic, func(x, y netlogtype.ConnectionCounts) bool { + nx := x.TxPackets + x.TxBytes + x.RxPackets + x.RxBytes + ny := y.TxPackets + y.TxBytes + y.RxPackets + y.RxBytes + return nx > ny + }) + var sum netlogtype.Counts + for _, cc := range traffic { + sum = sum.Add(cc.Counts) + } + rows = append(rows, [7]string{ + 0: heading + ":", + 3: formatSI(float64(sum.TxPackets) / duration.Seconds()), + 4: formatIEC(float64(sum.TxBytes) / duration.Seconds()), + 5: formatSI(float64(sum.RxPackets) / duration.Seconds()), + 6: formatIEC(float64(sum.RxBytes) / duration.Seconds()), + }) + if len(traffic) == 1 && traffic[0].Connection.IsZero() { + return // this is already a summary counts + } + formatAddrPort := func(a netip.AddrPort) string { + if !a.IsValid() { + return "" } - formatAddrPort := func(a netip.AddrPort) string { - if !a.IsValid() { - return "" - } - if name, ok := namesByAddr[a.Addr()]; ok { - if a.Port() == 0 { - return name - } - return name + ":" + strconv.Itoa(int(a.Port())) - } + if name, ok := namesByAddr[a.Addr()]; ok { if a.Port() == 0 { - return a.Addr().String() + return name } - return a.String() + return name + ":" + strconv.Itoa(int(a.Port())) } - for _, cc := range traffic { - row := [7]string{ - 0: " ", - 1: formatAddrPort(cc.Src), - 2: formatAddrPort(cc.Dst), - 3: formatSI(float64(cc.TxPackets) / duration.Seconds()), - 4: formatIEC(float64(cc.TxBytes) / duration.Seconds()), - 5: formatSI(float64(cc.RxPackets) / duration.Seconds()), - 6: formatIEC(float64(cc.RxBytes) / duration.Seconds()), - } - if cc.Proto > 0 { - row[0] += cc.Proto.String() + ":" - } - rows = append(rows, row) + if a.Port() == 0 { + return a.Addr().String() } + return a.String() } - addRows("VirtualTraffic", msg.VirtualTraffic) - addRows("SubnetTraffic", msg.SubnetTraffic) - addRows("ExitTraffic", msg.ExitTraffic) - addRows("PhysicalTraffic", msg.PhysicalTraffic) - - // Compute the maximum width of each field. - var maxWidths [7]int - for _, row := range rows { - for i, col := range row { - if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) { - maxWidths[i] = len(col) - } + for _, cc := range traffic { + row := [7]string{ + 0: " ", + 1: formatAddrPort(cc.Src), + 2: formatAddrPort(cc.Dst), + 3: formatSI(float64(cc.TxPackets) / duration.Seconds()), + 4: formatIEC(float64(cc.TxBytes) / duration.Seconds()), + 5: formatSI(float64(cc.RxPackets) / duration.Seconds()), + 6: formatIEC(float64(cc.RxBytes) / duration.Seconds()), } + if cc.Proto > 0 { + row[0] += cc.Proto.String() + ":" + } + rows = append(rows, row) } - var maxSum int - for _, n := range maxWidths { - maxSum += n - } + } + addRows("VirtualTraffic", msg.VirtualTraffic) + addRows("SubnetTraffic", msg.SubnetTraffic) + addRows("ExitTraffic", msg.ExitTraffic) + addRows("PhysicalTraffic", msg.PhysicalTraffic) - // Output a table of network traffic per connection. - line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len(" ")) - line = appendRepeatByte(line, '=', cap(line)) - fmt.Println(string(line)) - if msg.Logtail.ID != "" { - fmt.Printf("ID: %s\n", msg.Logtail.ID) + // Compute the maximum width of each field. + var maxWidths [7]int + for _, row := range rows { + for i, col := range row { + if maxWidths[i] < len(col) && !(i == 0 && !strings.HasPrefix(col, " ")) { + maxWidths[i] = len(col) + } } - fmt.Printf("Time: %s (%s)\n", msg.Start.Round(time.Millisecond).Format(time.RFC3339Nano), duration.Round(time.Millisecond)) - for i, row := range rows { - line = line[:0] - isHeading := !strings.HasPrefix(row[0], " ") - for j, col := range row { - if isHeading && j == 0 { - col = "" // headings will be printed later - } - switch j { - case 0, 2: // left justified - line = append(line, col...) - line = appendRepeatByte(line, ' ', maxWidths[j]-len(col)) - case 1, 3, 4, 5, 6: // right justified - line = appendRepeatByte(line, ' ', maxWidths[j]-len(col)) - line = append(line, col...) - } - switch j { - case 0: - line = append(line, " "...) - case 1: - if row[1] == "" && row[2] == "" { - line = append(line, " "...) - } else { - line = append(line, " -> "...) - } - case 2, 3, 4, 5: - line = append(line, " "...) - } + } + var maxSum int + for _, n := range maxWidths { + maxSum += n + } + + // Output a table of network traffic per connection. + line := make([]byte, 0, maxSum+len(" ")+len(" -> ")+4*len(" ")) + line = appendRepeatByte(line, '=', cap(line)) + fmt.Println(string(line)) + if !msg.Logtail.ID.IsZero() { + fmt.Printf("LogID: %s\n", msg.Logtail.ID) + } + if msg.NodeID != "" { + fmt.Printf("NodeID: %s\n", msg.NodeID) + } + formatTime := func(t time.Time) string { + return t.In(time.Local).Format("2006-01-02 15:04:05.000") + } + switch { + case !msg.Logged.IsZero(): + fmt.Printf("Logged: %s\n", formatTime(msg.Logged)) + case !msg.Logtail.Logged.IsZero(): + fmt.Printf("Logged: %s\n", formatTime(msg.Logtail.Logged)) + } + fmt.Printf("Window: %s (%0.3fs)\n", formatTime(msg.Start), duration.Seconds()) + for i, row := range rows { + line = line[:0] + isHeading := !strings.HasPrefix(row[0], " ") + for j, col := range row { + if isHeading && j == 0 { + col = "" // headings will be printed later + } + switch j { + case 0, 2: // left justified + line = append(line, col...) + line = appendRepeatByte(line, ' ', maxWidths[j]-len(col)) + case 1, 3, 4, 5, 6: // right justified + line = appendRepeatByte(line, ' ', maxWidths[j]-len(col)) + line = append(line, col...) } - switch { - case i == 0: // print dashed-line table heading - line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)] - case isHeading: - copy(line[:], row[0]) + switch j { + case 0: + line = append(line, " "...) + case 1: + if row[1] == "" && row[2] == "" { + line = append(line, " "...) + } else { + line = append(line, " -> "...) + } + case 2, 3, 4, 5: + line = append(line, " "...) } - fmt.Println(string(line)) } + switch { + case i == 0: // print dashed-line table heading + line = appendRepeatByte(line[:0], '-', maxWidths[0]+len(" ")+maxWidths[1]+len(" -> ")+maxWidths[2])[:cap(line)] + case isHeading: + copy(line[:], row[0]) + } + fmt.Println(string(line)) } } func mustMakeNamesByAddr() map[netip.Addr]string { switch { - case !*resolveNames: - return nil case *apiKey == "": log.Fatalf("--api-key must be specified with --resolve-names") case *tailnetName == "": diff --git a/go.mod b/go.mod index 3ce2e0cda..a52e717c3 100644 --- a/go.mod +++ b/go.mod @@ -17,9 +17,11 @@ require ( github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf github.com/creack/pty v1.1.17 github.com/dave/jennifer v1.4.1 + github.com/dsnet/try v0.0.3 github.com/evanw/esbuild v0.14.53 github.com/frankban/quicktest v1.14.0 github.com/fxamacker/cbor/v2 v2.4.0 + github.com/go-json-experiment/json v0.0.0-20221017203807-c5ed296b8c92 github.com/go-ole/go-ole v1.2.6 github.com/godbus/dbus/v5 v5.0.6 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da diff --git a/go.sum b/go.sum index 5a8af70e0..6788b71cf 100644 --- a/go.sum +++ b/go.sum @@ -261,6 +261,8 @@ github.com/docker/docker v20.10.16+incompatible h1:2Db6ZR/+FUR3hqPMwnogOPHFn405c github.com/docker/docker v20.10.16+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.6.4 h1:axCks+yV+2MR3/kZhAmy07yC56WZ2Pwu/fKWtKuZB0o= github.com/docker/docker-credential-helpers v0.6.4/go.mod h1:ofX3UI0Gz1TteYBjtgs07O36Pyasyp66D2uKT7H8W1c= +github.com/dsnet/try v0.0.3 h1:ptR59SsrcFUYbT/FhAbKTV6iLkeD6O18qfIWRml2fqI= +github.com/dsnet/try v0.0.3/go.mod h1:WBM8tRpUmnXXhY1U6/S8dt6UWdHTQ7y8A5YSkRCkq40= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= @@ -328,6 +330,8 @@ github.com/go-git/go-git/v5 v5.4.2/go.mod h1:gQ1kArt6d+n+BGd+/B/I74HwRTLhth2+zti github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-json-experiment/json v0.0.0-20221017203807-c5ed296b8c92 h1:eoE7yxLELqDQVlHGoYYxXLFZqF8NcdOnrukTm4ObJaY= +github.com/go-json-experiment/json v0.0.0-20221017203807-c5ed296b8c92/go.mod h1:I+I5/LT2lLP0eZsBNaVDrOrYASx9h7o7mRHmy+535/A= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=