Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
esiqveland committed Apr 9, 2019
0 parents commit 7dec12c
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Created by .ignore support plugin (hsz.mobi)
### Example user template template
### Example user template

# IntelliJ project files
.idea
*.iml
out
gen
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# balancer

A simple client side load balancer for go applications.

`balancer` was made to provide easier access to DNS-based load balancing for go services running in kubernetes and was mainly built for http.Client.

## Scope

This project does not do health checking and does not monitor status of any hosts.
This is left up to decide for a consul DNS or kubernetes DNS, and assumes hosts returned are deemed healthy.

`balancer` currently assumes that a lookup will return a non-empty set of initial hosts on startup.

## TODO

- [ ] Decide on a few error scenarios:
- [ ] DNS lookup hangs/times out
- [ ] DNS lookup returned 0 hosts

## Known limitations

- No health checking of hosts.
- Does not respect TTL of dns records as this is not exposed by the Go code.
18 changes: 18 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package balancer

import (
"net"

"github.com/pkg/errors"
)

var ErrNoHosts = errors.New("No hosts available in list")

type Balancer interface {
Next() (Host, error)
}

type Host struct {
Address net.IP
Port int
}
55 changes: 55 additions & 0 deletions httpbalancer/httpclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package httpbalancer

import (
"net"
"net/http"
"strconv"

"github.com/esiqveland/balancer"
"github.com/rs/zerolog"
)

func Wrap(client *http.Client, balancer balancer.Balancer) *http.Client {
rt := NewBalancedRoundTripper(balancer, client.Transport)
client.Transport = rt

return client
}

func NewBalancedRoundTripper(balancer balancer.Balancer, delegate http.RoundTripper) http.RoundTripper {
return &balancedRoundTripper{
Delegate: delegate,
Balancer: balancer,
}
}

type balancedRoundTripper struct {
Delegate http.RoundTripper
Balancer balancer.Balancer
}

func (rt *balancedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
log := zerolog.Ctx(ctx)

host, err := rt.Balancer.Next()
if err != nil {
return nil, err
}

log.Info().Msgf("selected host=%v", host.Address.String())
//var reqCopy http.Request = *req

selectedHost := net.JoinHostPort(host.Address.String(), strconv.Itoa(host.Port))

// strictly speaking, a RoundTripper is not allowed to mutate the request,
// except for reading and Closing the req.Body so this might have consequences I am not aware of.
req.URL.Host = selectedHost

return rt.Delegate.RoundTrip(req)
}

var (
// make sure we implement http.RoundTripper
_ http.RoundTripper = &balancedRoundTripper{}
)
117 changes: 117 additions & 0 deletions netbalancer/netbalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package netbalancer

import (
"log"
"net"
"sync"
"sync/atomic"
"time"

"github.com/esiqveland/balancer"
"github.com/pkg/errors"
)

// NewNetBalancer returns a Balancer that uses dns lookups from net.Lookup* to reload a set of hosts every updateInterval.
// We can not use TTL from dns because TTL is not exposed by the Go calls.
func NewNetBalancer(host string, port int, updateInterval time.Duration) (balancer.Balancer, error) {
initialHosts, err := lookup(host, port)
if len(initialHosts) == 0 {
return nil, errors.Wrapf(err, "Error no ips found for host=%v", host)
}

bal := &dnsBalancer{
lookupAddress: host,
port: port,
hosts: initialHosts,
interval: updateInterval,
counter: 0,
quit: make(chan int, 1),
lock: &sync.Mutex{},
}

// start update loop
go bal.update()

return bal, nil
}

type dnsBalancer struct {
lookupAddress string
port int
hosts []balancer.Host
counter uint64
interval time.Duration
quit chan int
lock *sync.Mutex
}

func (b *dnsBalancer) Next() (balancer.Host, error) {
// make sure to store a reference before we start
hosts := b.hosts
count := uint64(len(hosts))
if count == 0 {
return balancer.Host{}, balancer.ErrNoHosts
}

nextNum := atomic.AddUint64(&b.counter, 1)

idx := nextNum % count

return hosts[idx], nil
}

func (b *dnsBalancer) update() {
tick := time.NewTicker(b.interval)

for {
select {
case <-tick.C:
// TODO: timeout
// TODO: retries?
nextHostList, err := lookup(b.lookupAddress, b.port)
if err != nil {
// TODO: set hostList to empty?
// TODO: log?
} else {
if len(nextHostList) > 0 {
log.Printf("[DnsBalancer] reloaded dns=%v hosts=%v", b.lookupAddress, nextHostList)
b.lock.Lock()
b.hosts = nextHostList
b.lock.Unlock()
}
}
case <-b.quit:
tick.Stop()
return
}
}
}

func lookup(host string, port int) ([]balancer.Host, error) {
ips, err := net.LookupIP(host)
if err != nil {
return nil, errors.Wrapf(err, "Error looking up initial list for host=%v", host)
}

if len(ips) == 0 {
return nil, balancer.ErrNoHosts
}

hosts := []balancer.Host{}
for k := range ips {
entry := balancer.Host{
Address: ips[k],
Port: port,
}
hosts = append(hosts, entry)
}

return hosts, nil
}

func (b *dnsBalancer) Close() error {
// TODO: wait for exit
b.quit <- 1

return nil
}
56 changes: 56 additions & 0 deletions srvbalancer/srvbalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package srvbalancer

import (
"net"

"github.com/benschw/srv-lb/lb"
"github.com/esiqveland/balancer"
"github.com/pkg/errors"
)

// NewSRVBalancer creates a new balancer based on lookup of DNS SRV records.
// example usage: name: "_http", proto: "_tcp", host: "backend.namespace.kube.dc.org"
func NewSRVBalancer(name, proto, host string) (balancer.Balancer, error) {
srvName := name + "." + proto + "." + host

cfg, err := lb.DefaultConfig()
if err != nil {
return nil, err
}

l := lb.New(cfg, srvName)

_, err = l.Next()
if err != nil {
return nil, errors.Wrapf(err, "error looking up host=%v", srvName)
}

bal := &srvBalancer{
balancer: l,
}

return bal, nil
}

type srvBalancer struct {
balancer lb.LoadBalancer
}

func (lb *srvBalancer) Next() (balancer.Host, error) {
addr, err := lb.balancer.Next()
if err != nil {
return balancer.Host{}, err
}

ip := net.ParseIP(addr.Address)
if ip == nil {
return balancer.Host{}, errors.Errorf("unable to parse ip=%v", addr.Address)
}

host := balancer.Host{
Address: ip,
Port: int(addr.Port),
}

return host, nil
}

0 comments on commit 7dec12c

Please sign in to comment.