-
Notifications
You must be signed in to change notification settings - Fork 4
/
processor.go
123 lines (107 loc) · 3.54 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright 2021 Lumberjack authors (see AUTHORS file)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package remote defines a remote audit log processor.
package remote
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
api "github.com/abcxyz/lumberjack/clients/go/apis/v1alpha1"
)
// Option is the option to set up a remote audit log processor.
type Option func(*Processor) error
// WithGRPCDialOptions allows provide raw grpc.DialOption for the underlying connection.
func WithGRPCDialOptions(opts ...grpc.DialOption) Option {
return func(p *Processor) error {
p.rawDialOpts = opts
return nil
}
}
// WithDefaultAuth sets up the processor to connect to remote with the default auth setting.
func WithDefaultAuth() Option {
return WithIDTokenAuth(context.Background())
}
// grpcAuthOptions is the interface to get gRPC DialOptions and CallOptions
// specific to an auth setting.
type grpcAuthOptions interface {
dialOpts() ([]grpc.DialOption, error)
callOpts() ([]grpc.CallOption, error)
}
// Processor is the remote audit log processor.
type Processor struct {
address string
conn *grpc.ClientConn
client api.AuditLogAgentClient
authOpts grpcAuthOptions
rawDialOpts []grpc.DialOption
}
// NewProcessor creates a new remote audit log processor.
//
// E.g.
//
// p, err := NewProcessor("localhost:8080", WithDefaultAuth())
// if err != nil { ... }
// defer p.Close()
func NewProcessor(address string, opts ...Option) (*Processor, error) {
p := &Processor{address: address}
for _, o := range opts {
if err := o(p); err != nil {
return nil, fmt.Errorf("failed to set option: %w", err)
}
}
dialOpts := append(p.rawDialOpts, grpc.WithAuthority(address))
if p.authOpts != nil {
authDialOpts, err := p.authOpts.dialOpts()
if err != nil {
return nil, fmt.Errorf("failed to generate gRPC auth dial options: %w", err)
}
dialOpts = append(dialOpts, authDialOpts...)
} else {
// If no auth option is provided, fall back to insecure.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.Dial(address, dialOpts...)
if err != nil {
return nil, fmt.Errorf("dial remote log processor failed: %w", err)
}
p.conn = conn
p.client = api.NewAuditLogAgentClient(conn)
return p, nil
}
// Process processes the audit log request by calling a remote service.
func (p *Processor) Process(ctx context.Context, logReq *api.AuditLogRequest) error {
var authCallOpts []grpc.CallOption
if p.authOpts != nil {
var err error
authCallOpts, err = p.authOpts.callOpts()
if err != nil {
return fmt.Errorf("failed to generate gRPC auth call options: %w", err)
}
}
resp, err := p.client.ProcessLog(ctx, logReq, authCallOpts...)
if err != nil {
return fmt.Errorf("remote log processing failed: %w", err)
}
if resp.Result != nil {
logReq.Labels = resp.Result.Labels
logReq.Payload = resp.Result.Payload
logReq.Type = resp.Result.Type
}
return nil
}
// Stop stops the processor.
func (p *Processor) Stop() error {
return p.conn.Close()
}