diff --git a/tstest/integration/integration_test.go b/tstest/integration/integration_test.go index 5779339ba..a3bd7186e 100644 --- a/tstest/integration/integration_test.go +++ b/tstest/integration/integration_test.go @@ -6,6 +6,7 @@ package integration import ( "bytes" + "context" crand "crypto/rand" "crypto/tls" "encoding/json" @@ -46,11 +47,13 @@ import ( var ( verboseLogCatcher = flag.Bool("verbose-log-catcher", false, "verbose log catcher logging") + verboseTailscaled = flag.Bool("verbose-tailscaled", false, "verbose tailscaled logging") ) var mainError atomic.Value // of error func TestMain(m *testing.M) { + flag.Parse() v := m.Run() if v != 0 { os.Exit(v) @@ -253,19 +256,27 @@ func TestAddPingRequest(t *testing.T) { } nodeKey := nodes[0].Key - pr := &tailcfg.PingRequest{URL: waitPing.URL, Log: true} - ok := env.Control.AddPingRequest(nodeKey, pr) - if !ok { - t.Fatalf("no node found with NodeKey %v in AddPingRequest", nodeKey) - } + for i := 0; i < 10; i++ { + t.Logf("ping %v ...", i) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := env.Control.AwaitNodeInMapRequest(ctx, nodeKey); err != nil { + t.Fatal(err) + } + cancel() + pr := &tailcfg.PingRequest{URL: fmt.Sprintf("%s/ping-%d", waitPing.URL, i), Log: true} + ok := env.Control.AddPingRequest(nodeKey, pr) + if !ok { + t.Fatalf("no node found with NodeKey %v in AddPingRequest", nodeKey) + } - // Wait for PingRequest to come back - pingTimeout := time.NewTimer(10 * time.Second) - select { - case <-gotPing: - pingTimeout.Stop() - case <-pingTimeout.C: - t.Error("didn't get PingRequest from tailscaled") + // Wait for PingRequest to come back + pingTimeout := time.NewTimer(2 * time.Second) + select { + case <-gotPing: + pingTimeout.Stop() + case <-pingTimeout.C: + t.Fatal("didn't get PingRequest from tailscaled") + } } } @@ -384,6 +395,10 @@ func (n *testNode) StartDaemon(t testing.TB) *Daemon { "HTTP_PROXY="+n.env.TrafficTrapServer.URL, "HTTPS_PROXY="+n.env.TrafficTrapServer.URL, ) + if *verboseTailscaled { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stdout + } if err := cmd.Start(); err != nil { t.Fatalf("starting tailscaled: %v", err) } diff --git a/tstest/integration/testcontrol/testcontrol.go b/tstest/integration/testcontrol/testcontrol.go index 2e8ce82b9..fd118ea24 100644 --- a/tstest/integration/testcontrol/testcontrol.go +++ b/tstest/integration/testcontrol/testcontrol.go @@ -7,6 +7,7 @@ package testcontrol import ( "bytes" + "context" crand "crypto/rand" "encoding/binary" "encoding/json" @@ -46,6 +47,7 @@ type Server struct { mux *http.ServeMux mu sync.Mutex + cond *sync.Cond // lazily initialized by condLocked pubKey wgkey.Key privKey wgkey.Private nodes map[tailcfg.NodeKey]*tailcfg.Node @@ -68,6 +70,47 @@ func (s *Server) NumNodes() int { return len(s.nodes) } +// condLocked lazily initializes and returns s.cond. +// s.mu must be held. +func (s *Server) condLocked() *sync.Cond { + if s.cond == nil { + s.cond = sync.NewCond(&s.mu) + } + return s.cond +} + +// AwaitNodeInMapRequest waits for node k to be stuck in a map poll. +// It returns an error if and only if the context is done first. +func (s *Server) AwaitNodeInMapRequest(ctx context.Context, k tailcfg.NodeKey) error { + s.mu.Lock() + defer s.mu.Unlock() + cond := s.condLocked() + + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-done: + case <-ctx.Done(): + cond.Broadcast() + } + }() + + for { + node := s.nodeLocked(k) + if node == nil { + return errors.New("unknown node key") + } + if _, ok := s.updates[node.ID]; ok { + return nil + } + cond.Wait() + if err := ctx.Err(); err != nil { + return err + } + } +} + // AddPingRequest sends the ping pr to nodeKeyDst. It reports whether it did so. That is, // it reports whether nodeKeyDst was connected. func (s *Server) AddPingRequest(nodeKeyDst tailcfg.NodeKey, pr *tailcfg.PingRequest) bool { @@ -85,8 +128,7 @@ func (s *Server) AddPingRequest(nodeKeyDst tailcfg.NodeKey, pr *tailcfg.PingRequ s.pingReqsToAdd[nodeKeyDst] = pr nodeID := node.ID oldUpdatesCh := s.updates[nodeID] - sendUpdate(oldUpdatesCh, updateDebugInjection) - return true + return sendUpdate(oldUpdatesCh, updateDebugInjection) } type AuthPath struct { @@ -414,17 +456,19 @@ func (s *Server) updateLocked(source string, peers []tailcfg.NodeID) { } // sendUpdate sends updateType to dst if dst is non-nil and -// has capacity. -func sendUpdate(dst chan<- updateType, updateType updateType) { +// has capacity. It reports whether a value was sent. +func sendUpdate(dst chan<- updateType, updateType updateType) bool { if dst == nil { - return + return false } // The dst channel has a buffer size of 1. // If we fail to insert an update into the buffer that // means there is already an update pending. select { case dst <- updateType: + return true default: + return false } } @@ -489,6 +533,7 @@ func (s *Server) serveMap(w http.ResponseWriter, r *http.Request, mkey tailcfg.M sendUpdate(oldUpdatesCh, updateSelfChanged) } s.updateLocked("serveMap", peersToUpdate) + s.condLocked().Broadcast() s.mu.Unlock() // ReadOnly implies no streaming, as it doesn't