diff --git a/tstest/integration/integration_test.go b/tstest/integration/integration_test.go index 4c264dcff..19ef62f13 100644 --- a/tstest/integration/integration_test.go +++ b/tstest/integration/integration_test.go @@ -156,13 +156,20 @@ func TestTwoNodes(t *testing.T) { // Create two nodes: n1 := newTestNode(t, env) + n1SocksAddrCh := n1.socks5AddrChan() d1 := n1.StartDaemon(t) defer d1.Kill() n2 := newTestNode(t, env) + n2SocksAddrCh := n2.socks5AddrChan() d2 := n2.StartDaemon(t) defer d2.Kill() + n1Socks := n1.AwaitSocksAddr(t, n1SocksAddrCh) + n2Socks := n1.AwaitSocksAddr(t, n2SocksAddrCh) + t.Logf("node1 SOCKS5 addr: %v", n1Socks) + t.Logf("node2 SOCKS5 addr: %v", n2Socks) + n1.AwaitListening(t) n2.AwaitListening(t) n1.MustUp() @@ -358,6 +365,9 @@ type testNode struct { dir string // temp dir for sock & state sockFile string stateFile string + + mu sync.Mutex + onLogLine []func([]byte) } // newTestNode allocates a temp directory for a new test node. @@ -372,6 +382,85 @@ func newTestNode(t *testing.T, env *testEnv) *testNode { } } +// addLogLineHook registers a hook f to be called on each tailscaled +// log line output. +func (n *testNode) addLogLineHook(f func([]byte)) { + n.mu.Lock() + defer n.mu.Unlock() + n.onLogLine = append(n.onLogLine, f) +} + +// socks5AddrChan returns a channel that receives the address (e.g. "localhost:23874") +// of the node's SOCKS5 listener, once started. +func (n *testNode) socks5AddrChan() <-chan string { + ch := make(chan string, 1) + n.addLogLineHook(func(line []byte) { + const sub = "SOCKS5 listening on " + i := mem.Index(mem.B(line), mem.S(sub)) + if i == -1 { + return + } + addr := string(line)[i+len(sub):] + select { + case ch <- addr: + default: + } + }) + return ch +} + +func (n *testNode) AwaitSocksAddr(t testing.TB, ch <-chan string) string { + t.Helper() + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + select { + case v := <-ch: + return v + case <-timer.C: + t.Fatal("timeout waiting for node to log its SOCK5 listening address") + panic("unreachable") + } +} + +// nodeOutputParser parses stderr of tailscaled processes, calling the +// per-line callbacks previously registered via +// testNode.addLogLineHook. +type nodeOutputParser struct { + buf bytes.Buffer + n *testNode +} + +func (op *nodeOutputParser) Write(p []byte) (n int, err error) { + n, err = op.buf.Write(p) + op.parseLines() + return +} + +func (op *nodeOutputParser) parseLines() { + n := op.n + buf := op.buf.Bytes() + for len(buf) > 0 { + nl := bytes.IndexByte(buf, '\n') + if nl == -1 { + break + } + line := buf[:nl+1] + buf = buf[nl+1:] + lineTrim := bytes.TrimSpace(line) + + n.mu.Lock() + for _, f := range n.onLogLine { + f(lineTrim) + } + n.mu.Unlock() + } + if len(buf) == 0 { + op.buf.Reset() + } else { + io.CopyN(ioutil.Discard, &op.buf, int64(op.buf.Len()-len(buf))) + } +} + type Daemon struct { Process *os.Process } @@ -398,15 +487,17 @@ func (n *testNode) StartDaemon(t testing.TB) *Daemon { "--tun=userspace-networking", "--state="+n.stateFile, "--socket="+n.sockFile, + "--socks5-server=localhost:0", ) cmd.Env = append(os.Environ(), "TS_LOG_TARGET="+n.env.LogCatcherServer.URL, "HTTP_PROXY="+n.env.TrafficTrapServer.URL, "HTTPS_PROXY="+n.env.TrafficTrapServer.URL, ) + cmd.Stderr = &nodeOutputParser{n: n} if *verboseTailscaled { cmd.Stdout = os.Stdout - cmd.Stderr = os.Stdout + cmd.Stderr = io.MultiWriter(cmd.Stderr, os.Stderr) } if err := cmd.Start(); err != nil { t.Fatalf("starting tailscaled: %v", err)