@@ -7,24 +7,36 @@ import (
7
7
"fmt"
8
8
"io"
9
9
"net/http"
10
+ "net/netip"
10
11
"net/url"
11
12
"reflect"
12
13
"strconv"
13
14
"sync"
15
+ "time"
14
16
"unicode"
15
17
18
+ "golang.org/x/exp/maps"
16
19
"golang.org/x/xerrors"
20
+ "google.golang.org/protobuf/types/known/durationpb"
21
+ "google.golang.org/protobuf/types/known/timestamppb"
17
22
"tailscale.com/net/dns"
23
+ "tailscale.com/util/dnsname"
18
24
"tailscale.com/wgengine/router"
19
25
26
+ "github.com/google/uuid"
27
+
20
28
"cdr.dev/slog"
21
29
"github.com/coder/coder/v2/coderd/util/ptr"
22
30
"github.com/coder/coder/v2/tailnet"
23
31
)
24
32
33
+ // The interval at which the tunnel sends network status updates to the manager.
34
+ const netStatusInterval = 30 * time .Second
35
+
25
36
type Tunnel struct {
26
37
speaker [* TunnelMessage , * ManagerMessage , ManagerMessage ]
27
38
ctx context.Context
39
+ netLoopDone chan struct {}
28
40
requestLoopDone chan struct {}
29
41
30
42
logger slog.Logger
@@ -35,6 +47,10 @@ type Tunnel struct {
35
47
client Client
36
48
conn Conn
37
49
50
+ mu sync.Mutex
51
+ // agents contains the agents that are currently connected to the tunnel.
52
+ agents map [uuid.UUID ]* tailnet.Agent
53
+
38
54
// clientLogger is a separate logger than `logger` when the `UseAsLogger`
39
55
// option is used, to avoid the tunnel using itself as a sink for it's own
40
56
// logs, which could lead to deadlocks.
@@ -66,14 +82,17 @@ func NewTunnel(
66
82
logger : logger ,
67
83
clientLogger : logger ,
68
84
requestLoopDone : make (chan struct {}),
85
+ netLoopDone : make (chan struct {}),
69
86
client : client ,
87
+ agents : make (map [uuid.UUID ]* tailnet.Agent ),
70
88
}
71
89
72
90
for _ , opt := range opts {
73
91
opt (t )
74
92
}
75
93
t .speaker .start ()
76
94
go t .requestLoop ()
95
+ go t .netStatusLoop ()
77
96
return t , nil
78
97
}
79
98
@@ -102,6 +121,20 @@ func (t *Tunnel) requestLoop() {
102
121
}
103
122
}
104
123
124
+ func (t * Tunnel ) netStatusLoop () {
125
+ ticker := time .NewTicker (netStatusInterval )
126
+ defer ticker .Stop ()
127
+ defer close (t .netLoopDone )
128
+ for {
129
+ select {
130
+ case <- t .ctx .Done ():
131
+ return
132
+ case <- ticker .C :
133
+ t .sendAgentUpdate ()
134
+ }
135
+ }
136
+ }
137
+
105
138
// handleRPC handles unary RPCs from the manager.
106
139
func (t * Tunnel ) handleRPC (req * ManagerMessage , msgID uint64 ) * TunnelMessage {
107
140
resp := & TunnelMessage {}
@@ -112,8 +145,12 @@ func (t *Tunnel) handleRPC(req *ManagerMessage, msgID uint64) *TunnelMessage {
112
145
if err != nil {
113
146
t .logger .Critical (t .ctx , "failed to get current workspace state" , slog .Error (err ))
114
147
}
148
+ update , err := t .createPeerUpdate (state )
149
+ if err != nil {
150
+ t .logger .Error (t .ctx , "failed to populate agent network info" , slog .Error (err ))
151
+ }
115
152
resp .Msg = & TunnelMessage_PeerUpdate {
116
- PeerUpdate : convertWorkspaceUpdate ( state ) ,
153
+ PeerUpdate : update ,
117
154
}
118
155
return resp
119
156
case * ManagerMessage_Start :
@@ -194,9 +231,13 @@ func (t *Tunnel) ApplyNetworkSettings(ctx context.Context, ns *NetworkSettingsRe
194
231
}
195
232
196
233
func (t * Tunnel ) Update (update tailnet.WorkspaceUpdate ) error {
234
+ peerUpdate , err := t .createPeerUpdate (update )
235
+ if err != nil {
236
+ t .logger .Error (t .ctx , "failed to populate agent network info" , slog .Error (err ))
237
+ }
197
238
msg := & TunnelMessage {
198
239
Msg : & TunnelMessage_PeerUpdate {
199
- PeerUpdate : convertWorkspaceUpdate ( update ) ,
240
+ PeerUpdate : peerUpdate ,
200
241
},
201
242
}
202
243
select {
@@ -293,35 +334,30 @@ func sinkEntryToPb(e slog.SinkEntry) *Log {
293
334
return l
294
335
}
295
336
296
- func convertWorkspaceUpdate (update tailnet.WorkspaceUpdate ) * PeerUpdate {
337
+ // createPeerUpdate creates a PeerUpdate message from a workspace update, populating
338
+ // the network status of the agents.
339
+ func (t * Tunnel ) createPeerUpdate (update tailnet.WorkspaceUpdate ) (* PeerUpdate , error ) {
297
340
out := & PeerUpdate {
298
341
UpsertedWorkspaces : make ([]* Workspace , len (update .UpsertedWorkspaces )),
299
342
UpsertedAgents : make ([]* Agent , len (update .UpsertedAgents )),
300
343
DeletedWorkspaces : make ([]* Workspace , len (update .DeletedWorkspaces )),
301
344
DeletedAgents : make ([]* Agent , len (update .DeletedAgents )),
302
345
}
346
+
347
+ t .saveUpdate (update )
348
+
303
349
for i , ws := range update .UpsertedWorkspaces {
304
350
out .UpsertedWorkspaces [i ] = & Workspace {
305
351
Id : tailnet .UUIDToByteSlice (ws .ID ),
306
352
Name : ws .Name ,
307
353
Status : Workspace_Status (ws .Status ),
308
354
}
309
355
}
310
- for i , agent := range update .UpsertedAgents {
311
- fqdn := make ([]string , 0 , len (agent .Hosts ))
312
- for name := range agent .Hosts {
313
- fqdn = append (fqdn , name .WithTrailingDot ())
314
- }
315
- out .UpsertedAgents [i ] = & Agent {
316
- Id : tailnet .UUIDToByteSlice (agent .ID ),
317
- Name : agent .Name ,
318
- WorkspaceId : tailnet .UUIDToByteSlice (agent .WorkspaceID ),
319
- Fqdn : fqdn ,
320
- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
321
- // TODO: Populate
322
- LastHandshake : nil ,
323
- }
356
+ upsertedAgents , err := t .populateAgents (update .UpsertedAgents )
357
+ if err != nil {
358
+ return nil , xerrors .Errorf ("failed to populate agent network info: %w" , err )
324
359
}
360
+ out .UpsertedAgents = upsertedAgents
325
361
for i , ws := range update .DeletedWorkspaces {
326
362
out .DeletedWorkspaces [i ] = & Workspace {
327
363
Id : tailnet .UUIDToByteSlice (ws .ID ),
@@ -335,16 +371,106 @@ func convertWorkspaceUpdate(update tailnet.WorkspaceUpdate) *PeerUpdate {
335
371
fqdn = append (fqdn , name .WithTrailingDot ())
336
372
}
337
373
out .DeletedAgents [i ] = & Agent {
374
+ Id : tailnet .UUIDToByteSlice (agent .ID ),
375
+ Name : agent .Name ,
376
+ WorkspaceId : tailnet .UUIDToByteSlice (agent .WorkspaceID ),
377
+ Fqdn : fqdn ,
378
+ IpAddrs : hostsToIPStrings (agent .Hosts ),
379
+ LastHandshake : nil ,
380
+ Latency : nil ,
381
+ }
382
+ }
383
+ return out , nil
384
+ }
385
+
386
+ // Given a list of agents, populate their network info, and return them as proto agents.
387
+ func (t * Tunnel ) populateAgents (agents []* tailnet.Agent ) ([]* Agent , error ) {
388
+ if t .conn == nil {
389
+ return nil , xerrors .New ("no active connection" )
390
+ }
391
+
392
+ out := make ([]* Agent , 0 , len (agents ))
393
+ var wg sync.WaitGroup
394
+ pingCtx , cancelFunc := context .WithTimeout (context .Background (), 5 * time .Second )
395
+ defer cancelFunc ()
396
+
397
+ for _ , agent := range agents {
398
+ fqdn := make ([]string , 0 , len (agent .Hosts ))
399
+ for name := range agent .Hosts {
400
+ fqdn = append (fqdn , name .WithTrailingDot ())
401
+ }
402
+ protoAgent := & Agent {
338
403
Id : tailnet .UUIDToByteSlice (agent .ID ),
339
404
Name : agent .Name ,
340
405
WorkspaceId : tailnet .UUIDToByteSlice (agent .WorkspaceID ),
341
406
Fqdn : fqdn ,
342
- IpAddrs : []string {tailnet .CoderServicePrefix .AddrFromUUID (agent .ID ).String ()},
343
- // TODO: Populate
344
- LastHandshake : nil ,
407
+ IpAddrs : hostsToIPStrings (agent .Hosts ),
345
408
}
409
+ agentIP := tailnet .CoderServicePrefix .AddrFromUUID (agent .ID )
410
+ wg .Add (1 )
411
+ go func () {
412
+ defer wg .Done ()
413
+ duration , _ , _ , err := t .conn .Ping (pingCtx , agentIP )
414
+ if err != nil {
415
+ return
416
+ }
417
+ protoAgent .Latency = durationpb .New (duration )
418
+ }()
419
+ diags := t .conn .GetPeerDiagnostics (agent .ID )
420
+ //nolint:revive // outdated rule
421
+ protoAgent .LastHandshake = timestamppb .New (diags .LastWireguardHandshake )
422
+ out = append (out , protoAgent )
423
+ }
424
+ wg .Wait ()
425
+
426
+ return out , nil
427
+ }
428
+
429
+ // saveUpdate saves the workspace update to the tunnel's state, such that it can
430
+ // be used to populate automated peer updates.
431
+ func (t * Tunnel ) saveUpdate (update tailnet.WorkspaceUpdate ) {
432
+ t .mu .Lock ()
433
+ defer t .mu .Unlock ()
434
+
435
+ for _ , agent := range update .UpsertedAgents {
436
+ t .agents [agent .ID ] = agent
437
+ }
438
+ for _ , agent := range update .DeletedAgents {
439
+ delete (t .agents , agent .ID )
440
+ }
441
+ }
442
+
443
+ // sendAgentUpdate sends a peer update message to the manager with the current
444
+ // state of the agents, including the latest network status.
445
+ func (t * Tunnel ) sendAgentUpdate () {
446
+ // The lock must be held until we send the message,
447
+ // else we risk upserting a deleted agent.
448
+ t .mu .Lock ()
449
+ defer t .mu .Unlock ()
450
+
451
+ upsertedAgents , err := t .populateAgents (maps .Values (t .agents ))
452
+ if err != nil {
453
+ t .logger .Error (t .ctx , "failed to produce agent network status update" , slog .Error (err ))
454
+ return
455
+ }
456
+
457
+ if len (upsertedAgents ) == 0 {
458
+ return
459
+ }
460
+
461
+ msg := & TunnelMessage {
462
+ Msg : & TunnelMessage_PeerUpdate {
463
+ PeerUpdate : & PeerUpdate {
464
+ UpsertedAgents : upsertedAgents ,
465
+ },
466
+ },
467
+ }
468
+
469
+ select {
470
+ case <- t .ctx .Done ():
471
+ return
472
+ case t .sendCh <- msg :
346
473
}
347
- return out
348
474
}
349
475
350
476
// the following are taken from sloghuman:
@@ -404,3 +530,17 @@ func quote(key string) string {
404
530
}
405
531
return quoted
406
532
}
533
+
534
+ func hostsToIPStrings (hosts map [dnsname.FQDN ][]netip.Addr ) []string {
535
+ seen := make (map [netip.Addr ]struct {})
536
+ var result []string
537
+ for _ , inner := range hosts {
538
+ for _ , elem := range inner {
539
+ if _ , exists := seen [elem ]; ! exists {
540
+ seen [elem ] = struct {}{}
541
+ result = append (result , elem .String ())
542
+ }
543
+ }
544
+ }
545
+ return result
546
+ }
0 commit comments