Skip to content

Commit

Permalink
netbalancer: add srvbalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
esiqveland committed Apr 9, 2019
1 parent 7dec12c commit 1fd6bce
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 4 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ A simple client side load balancer for go applications.
This project does not do health checking and does not monitor status of any hosts.
This is left up to decide for a consul DNS or kubernetes DNS, and assumes hosts returned are deemed healthy.

This library does not retry or otherwise try to fix problems, leaving this up to the caller.

`balancer` currently assumes that a lookup will return a non-empty set of initial hosts on startup.

## TODO

- [ ] Decide on a few error scenarios:
- [ ] DNS lookup hangs/times out
- [ ] DNS lookup returned 0 hosts
- [ ] `netbalancer`: implement a timeout?
- [ ] `netbalancer`: DNS lookup returned 0 hosts
- [ ] `netbalancer`: DNS lookup returned error

## Known limitations

- No health checking of hosts.
- Does not respect TTL of dns records as this is not exposed by the Go code.
- `netbalancer` does not respect TTL of dns records as this is not exposed by the Go code.
2 changes: 1 addition & 1 deletion netbalancer/netbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// NewNetBalancer returns a Balancer that uses dns lookups from net.Lookup* to reload a set of hosts every updateInterval.
// We can not use TTL from dns because TTL is not exposed by the Go calls.
func NewNetBalancer(host string, port int, updateInterval time.Duration) (balancer.Balancer, error) {
func New(host string, port int, updateInterval time.Duration) (balancer.Balancer, error) {
initialHosts, err := lookup(host, port)
if len(initialHosts) == 0 {
return nil, errors.Wrapf(err, "Error no ips found for host=%v", host)
Expand Down
133 changes: 133 additions & 0 deletions netbalancer/srvbalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package netbalancer

import (
"context"
"log"
"net"
"sync"
"sync/atomic"
"time"

"github.com/esiqveland/balancer"
"github.com/pkg/errors"
)

type dnsSrvBalancer struct {
serviceName string
proto string
host string
hosts []balancer.Host
counter uint64
interval time.Duration
quit chan int
lock *sync.Mutex
}

// NewNetBalancer returns a Balancer that uses dns lookups from net.Lookup* to reload a set of hosts every updateInterval.
// We can not use TTL from dns because TTL is not exposed by the Go calls.
func NewSRV(servicename, proto, host string, updateInterval time.Duration) (balancer.Balancer, error) {
initialHosts, err := lookupSRV(servicename, proto, host)
if err != nil {
return nil, err
}

if len(initialHosts) == 0 {
return nil, errors.Wrapf(err, "Error no ips found for host=%v", host)
}

bal := &dnsSrvBalancer{
serviceName: servicename,
proto: proto,
host: host,
hosts: initialHosts,
interval: updateInterval,
counter: 0,
quit: make(chan int, 1),
lock: &sync.Mutex{},
}

// start update loop
go bal.update()

return bal, nil
}

func lookupSRV(servicename, proto, host string) ([]balancer.Host, error) {
hosts := []balancer.Host{}

_, addrs, err := net.LookupSRV(servicename, proto, host)
if err != nil {
return hosts, err
}

var firstErr error = nil

for _, v := range addrs {
ips, err := net.DefaultResolver.LookupIPAddr(context.TODO(), v.Target)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}

for e := range ips {
host := balancer.Host{
Address: ips[e].IP,
Port: int(v.Port),
}
hosts = append(hosts, host)
}
}

return hosts, firstErr
}

func (b *dnsSrvBalancer) Next() (balancer.Host, error) {
// make sure to store a reference before we start
hosts := b.hosts
count := uint64(len(hosts))
if count == 0 {
return balancer.Host{}, balancer.ErrNoHosts
}

nextNum := atomic.AddUint64(&b.counter, 1)

idx := nextNum % count

return hosts[idx], nil
}

func (b *dnsSrvBalancer) update() {
tick := time.NewTicker(b.interval)

for {
select {
case <-tick.C:
// TODO: timeout
// TODO: retries?
nextHostList, err := lookupSRV(b.serviceName, b.proto, b.host)
if err != nil {
// TODO: set hostList to empty?
// TODO: log?
} else {
if len(nextHostList) > 0 {
log.Printf("[DnsBalancer] reloaded dns=%v hosts=%v", b.host, nextHostList)
b.lock.Lock()
b.hosts = nextHostList
b.lock.Unlock()
}
}
case <-b.quit:
tick.Stop()
return
}
}
}

func (b *dnsSrvBalancer) Close() error {
// TODO: wait for exit
b.quit <- 1

return nil
}

0 comments on commit 1fd6bce

Please sign in to comment.