Skip to content

Commit f3b75c9

Browse files
committed
chore(vpn): upsert agents with their network status
1 parent 469ff7a commit f3b75c9

File tree

5 files changed

+565
-157
lines changed

5 files changed

+565
-157
lines changed

vpn/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ import (
55
"net/http"
66
"net/netip"
77
"net/url"
8+
"time"
89

910
"golang.org/x/xerrors"
1011
"nhooyr.io/websocket"
12+
"tailscale.com/ipn/ipnstate"
1113
"tailscale.com/net/dns"
1214
"tailscale.com/wgengine/router"
1315

16+
"github.com/google/uuid"
1417
"github.com/tailscale/wireguard-go/tun"
1518

1619
"cdr.dev/slog"
@@ -23,6 +26,8 @@ import (
2326

2427
type Conn interface {
2528
CurrentWorkspaceState() (tailnet.WorkspaceUpdate, error)
29+
GetPeerDiagnostics(peerID uuid.UUID) tailnet.PeerDiagnostics
30+
Ping(ctx context.Context, ip netip.Addr) (time.Duration, bool, *ipnstate.PingResult, error)
2631
Close() error
2732
}
2833

vpn/tunnel.go

Lines changed: 161 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"net/netip"
1011
"net/url"
1112
"reflect"
1213
"strconv"
1314
"sync"
15+
"time"
1416
"unicode"
1517

18+
"golang.org/x/exp/maps"
1619
"golang.org/x/xerrors"
20+
"google.golang.org/protobuf/types/known/durationpb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
1722
"tailscale.com/net/dns"
23+
"tailscale.com/util/dnsname"
1824
"tailscale.com/wgengine/router"
1925

26+
"github.com/google/uuid"
27+
2028
"cdr.dev/slog"
2129
"github.com/coder/coder/v2/coderd/util/ptr"
2230
"github.com/coder/coder/v2/tailnet"
2331
)
2432

33+
// The interval at which the tunnel sends network status updates to the manager.
34+
const netStatusInterval = 30 * time.Second
35+
2536
type Tunnel struct {
2637
speaker[*TunnelMessage, *ManagerMessage, ManagerMessage]
2738
ctx context.Context
39+
netLoopDone chan struct{}
2840
requestLoopDone chan struct{}
2941

3042
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
3547
client Client
3648
conn Conn
3749

50+
mu sync.Mutex
51+
// agents contains the agents that are currently connected to the tunnel.
52+
agents map[uuid.UUID]*tailnet.Agent
53+
3854
// clientLogger is a separate logger than `logger` when the `UseAsLogger`
3955
// option is used, to avoid the tunnel using itself as a sink for it's own
4056
// logs, which could lead to deadlocks.
@@ -66,14 +82,17 @@ func NewTunnel(
6682
logger: logger,
6783
clientLogger: logger,
6884
requestLoopDone: make(chan struct{}),
85+
netLoopDone: make(chan struct{}),
6986
client: client,
87+
agents: make(map[uuid.UUID]*tailnet.Agent),
7088
}
7189

7290
for _, opt := range opts {
7391
opt(t)
7492
}
7593
t.speaker.start()
7694
go t.requestLoop()
95+
go t.netStatusLoop()
7796
return t, nil
7897
}
7998

@@ -102,6 +121,20 @@ func (t *Tunnel) requestLoop() {
102121
}
103122
}
104123

124+
func (t *Tunnel) netStatusLoop() {
125+
ticker := time.NewTicker(netStatusInterval)
126+
defer ticker.Stop()
127+
defer close(t.netLoopDone)
128+
for {
129+
select {
130+
case <-t.ctx.Done():
131+
return
132+
case <-ticker.C:
133+
t.sendAgentUpdate()
134+
}
135+
}
136+
}
137+
105138
// handleRPC handles unary RPCs from the manager.
106139
func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
107140
resp := &TunnelMessage{}
@@ -112,8 +145,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
112145
if err != nil {
113146
t.logger.Critical(t.ctx, "failed to get current workspace state", slog.Error(err))
114147
}
148+
update, err := t.createPeerUpdate(state)
149+
if err != nil {
150+
t.logger.Error(t.ctx, "failed to populate agent network info", slog.Error(err))
151+
}
115152
resp.Msg = &TunnelMessage_PeerUpdate{
116-
PeerUpdate: convertWorkspaceUpdate(state),
153+
PeerUpdate: update,
117154
}
118155
return resp
119156
case *ManagerMessage_Start:
@@ -194,9 +231,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
194231
}
195232

196233
func (t *Tunnel) Update(update tailnet.WorkspaceUpdate) error {
234+
peerUpdate, err := t.createPeerUpdate(update)
235+
if err != nil {
236+
t.logger.Error(t.ctx, "failed to populate agent network info", slog.Error(err))
237+
}
197238
msg := &TunnelMessage{
198239
Msg: &TunnelMessage_PeerUpdate{
199-
PeerUpdate: convertWorkspaceUpdate(update),
240+
PeerUpdate: peerUpdate,
200241
},
201242
}
202243
select {
@@ -293,35 +334,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
293334
return l
294335
}
295336

296-
func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
337+
// createPeerUpdate creates a PeerUpdate message from a workspace update, populating
338+
// the network status of the agents.
339+
func (t *Tunnel) createPeerUpdate(update tailnet.WorkspaceUpdate) (*PeerUpdate, error) {
297340
out := &PeerUpdate{
298341
UpsertedWorkspaces: make([]*Workspace, len(update.UpsertedWorkspaces)),
299342
UpsertedAgents: make([]*Agent, len(update.UpsertedAgents)),
300343
DeletedWorkspaces: make([]*Workspace, len(update.DeletedWorkspaces)),
301344
DeletedAgents: make([]*Agent, len(update.DeletedAgents)),
302345
}
346+
347+
t.saveUpdate(update)
348+
303349
for i, ws := range update.UpsertedWorkspaces {
304350
out.UpsertedWorkspaces[i] = &Workspace{
305351
Id: tailnet.UUIDToByteSlice(ws.ID),
306352
Name: ws.Name,
307353
Status: Workspace_Status(ws.Status),
308354
}
309355
}
310-
for i, agent := range update.UpsertedAgents {
311-
fqdn := make([]string, 0, len(agent.Hosts))
312-
for name := range agent.Hosts {
313-
fqdn = append(fqdn, name.WithTrailingDot())
314-
}
315-
out.UpsertedAgents[i] = &Agent{
316-
Id: tailnet.UUIDToByteSlice(agent.ID),
317-
Name: agent.Name,
318-
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
319-
Fqdn: fqdn,
320-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
321-
// TODO: Populate
322-
LastHandshake: nil,
323-
}
356+
upsertedAgents, err := t.populateAgents(update.UpsertedAgents)
357+
if err != nil {
358+
return nil, xerrors.Errorf("failed to populate agent network info: %w", err)
324359
}
360+
out.UpsertedAgents = upsertedAgents
325361
for i, ws := range update.DeletedWorkspaces {
326362
out.DeletedWorkspaces[i] = &Workspace{
327363
Id: tailnet.UUIDToByteSlice(ws.ID),
@@ -335,16 +371,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
335371
fqdn = append(fqdn, name.WithTrailingDot())
336372
}
337373
out.DeletedAgents[i] = &Agent{
374+
Id: tailnet.UUIDToByteSlice(agent.ID),
375+
Name: agent.Name,
376+
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
377+
Fqdn: fqdn,
378+
IpAddrs: hostsToIPStrings(agent.Hosts),
379+
LastHandshake: nil,
380+
Latency: nil,
381+
}
382+
}
383+
return out, nil
384+
}
385+
386+
// Given a list of agents, populate their network info, and return them as proto agents.
387+
func (t *Tunnel) populateAgents(agents []*tailnet.Agent) ([]*Agent, error) {
388+
if t.conn == nil {
389+
return nil, xerrors.New("no active connection")
390+
}
391+
392+
out := make([]*Agent, 0, len(agents))
393+
var wg sync.WaitGroup
394+
pingCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
395+
defer cancelFunc()
396+
397+
for _, agent := range agents {
398+
fqdn := make([]string, 0, len(agent.Hosts))
399+
for name := range agent.Hosts {
400+
fqdn = append(fqdn, name.WithTrailingDot())
401+
}
402+
protoAgent := &Agent{
338403
Id: tailnet.UUIDToByteSlice(agent.ID),
339404
Name: agent.Name,
340405
WorkspaceId: tailnet.UUIDToByteSlice(agent.WorkspaceID),
341406
Fqdn: fqdn,
342-
IpAddrs: []string{tailnet.CoderServicePrefix.AddrFromUUID(agent.ID).String()},
343-
// TODO: Populate
344-
LastHandshake: nil,
407+
IpAddrs: hostsToIPStrings(agent.Hosts),
345408
}
409+
agentIP := tailnet.CoderServicePrefix.AddrFromUUID(agent.ID)
410+
wg.Add(1)
411+
go func() {
412+
defer wg.Done()
413+
duration, _, _, err := t.conn.Ping(pingCtx, agentIP)
414+
if err != nil {
415+
return
416+
}
417+
protoAgent.Latency = durationpb.New(duration)
418+
}()
419+
diags := t.conn.GetPeerDiagnostics(agent.ID)
420+
//nolint:revive // outdated rule
421+
protoAgent.LastHandshake = timestamppb.New(diags.LastWireguardHandshake)
422+
out = append(out, protoAgent)
423+
}
424+
wg.Wait()
425+
426+
return out, nil
427+
}
428+
429+
// saveUpdate saves the workspace update to the tunnel's state, such that it can
430+
// be used to populate automated peer updates.
431+
func (t *Tunnel) saveUpdate(update tailnet.WorkspaceUpdate) {
432+
t.mu.Lock()
433+
defer t.mu.Unlock()
434+
435+
for _, agent := range update.UpsertedAgents {
436+
t.agents[agent.ID] = agent
437+
}
438+
for _, agent := range update.DeletedAgents {
439+
delete(t.agents, agent.ID)
440+
}
441+
}
442+
443+
// sendAgentUpdate sends a peer update message to the manager with the current
444+
// state of the agents, including the latest network status.
445+
func (t *Tunnel) sendAgentUpdate() {
446+
// The lock must be held until we send the message,
447+
// else we risk upserting a deleted agent.
448+
t.mu.Lock()
449+
defer t.mu.Unlock()
450+
451+
upsertedAgents, err := t.populateAgents(maps.Values(t.agents))
452+
if err != nil {
453+
t.logger.Error(t.ctx, "failed to produce agent network status update", slog.Error(err))
454+
return
455+
}
456+
457+
if len(upsertedAgents) == 0 {
458+
return
459+
}
460+
461+
msg := &TunnelMessage{
462+
Msg: &TunnelMessage_PeerUpdate{
463+
PeerUpdate: &PeerUpdate{
464+
UpsertedAgents: upsertedAgents,
465+
},
466+
},
467+
}
468+
469+
select {
470+
case <-t.ctx.Done():
471+
return
472+
case t.sendCh <- msg:
346473
}
347-
return out
348474
}
349475

350476
// the following are taken from sloghuman:
@@ -404,3 +530,17 @@ func quote(key string) string {
404530
}
405531
return quoted
406532
}
533+
534+
func hostsToIPStrings(hosts map[dnsname.FQDN][]netip.Addr) []string {
535+
seen := make(map[netip.Addr]struct{})
536+
var result []string
537+
for _, inner := range hosts {
538+
for _, elem := range inner {
539+
if _, exists := seen[elem]; !exists {
540+
seen[elem] = struct{}{}
541+
result = append(result, elem.String())
542+
}
543+
}
544+
}
545+
return result
546+
}

0 commit comments

Comments
 (0)