Skip to content
Snippets Groups Projects
Commit e1be329b authored by kaiyou's avatar kaiyou
Browse files

Improve documentation and code structure

parent b8adbcad
No related branches found
No related tags found
Loading
...@@ -21,56 +21,69 @@ var rootCmd = &cobra.Command{ ...@@ -21,56 +21,69 @@ var rootCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
if config.Contained { if config.Contained {
currentIP := WaitForIP() run()
logrus.Debug("Current IP is: ", currentIP.String())
c := cluster.New(
config.ClusterName, config.NodeName,
currentIP, config.DiscoveryPort,
config.Anchors.IPSlice(),
config.Key,
)
err := c.Run()
if err != nil {
logrus.Fatal(err)
}
} else { } else {
c, err := selfcontain.New( containerize()
config.NodeName,
config.LibcontainerPath(),
config.RootPath(),
append(os.Args[1:], "--contained"),
)
if err != nil {
logrus.Fatal(err)
}
dns := config.DNS.IPSlice()
// Use default DNS if required
if len(dns) == 0 {
dns = defaultDNS
}
err = c.SetupNetworking(
config.Master,
*config.IP.IPNet(), config.Gw.IP, dns,
config.RootPath(),
)
if err != nil {
logrus.Fatal(err)
}
s := make(chan os.Signal, 1)
signal.Notify(s, os.Interrupt)
go func() {
<-s
c.Destroy()
}()
err = c.Run()
if err != nil {
logrus.Fatal(err)
}
} }
}, },
} }
func WaitForIP() net.IP { // Actually run hepto
func run() {
currentIP := waitForIP()
logrus.Debug("Current IP is: ", currentIP.String())
c := cluster.New(
config.ClusterName, config.NodeName,
currentIP, config.DiscoveryPort,
config.Anchors.IPSlice(),
config.Key,
)
err := c.Run()
if err != nil {
logrus.Fatal(err)
}
}
// Run ourselves inside a container
func containerize() {
c, err := selfcontain.New(
config.NodeName,
config.LibcontainerPath(),
config.RootPath(),
append(os.Args[1:], "--contained"),
)
if err != nil {
logrus.Fatal(err)
}
dns := config.DNS.IPSlice()
// Use default DNS if required
if len(dns) == 0 {
dns = defaultDNS
}
err = c.SetupNetworking(
config.Master,
*config.IP.IPNet(), config.Gw.IP, dns,
config.RootPath(),
)
if err != nil {
logrus.Fatal(err)
}
// Make sure we are notified and that we destroy the
// container upon being interrupted
s := make(chan os.Signal, 1)
signal.Notify(s, os.Interrupt)
go func() {
<-s
c.Destroy()
}()
err = c.Run()
if err != nil {
logrus.Fatal(err)
}
}
// Guess the current IP address
func waitForIP() net.IP {
for { for {
logrus.Debug("Determining current IP address") logrus.Debug("Determining current IP address")
conn, err := net.Dial("udp", fmt.Sprintf("[%s]:53", defaultDNS[0].String())) conn, err := net.Dial("udp", fmt.Sprintf("[%s]:53", defaultDNS[0].String()))
......
// Hepto cluster implements the main clustering logics, based on
// sml (Simple Memberlist) for cluster bootstraping and node discovery
package cluster package cluster
import ( import (
......
...@@ -6,8 +6,9 @@ import ( ...@@ -6,8 +6,9 @@ import (
"forge.tedomum.net/acides/hepto/hepto/pkg/sml" "forge.tedomum.net/acides/hepto/hepto/pkg/sml"
) )
// Represents a node metadata metadata // Represents a node metadata
type HeptoMeta struct { type HeptoMeta struct {
// Public key for the wireguard mesh VPN
VpnKey []byte `json:"vpnKey"` VpnKey []byte `json:"vpnKey"`
} }
......
...@@ -102,7 +102,7 @@ var readOnlyPath = []string{ ...@@ -102,7 +102,7 @@ var readOnlyPath = []string{
"/proc/bus", "/proc/bus",
} }
// Make a container configuration // Make a container configuration using defaults
func makeConfig(name string, newRoot string) *configs.Config { func makeConfig(name string, newRoot string) *configs.Config {
deviceRules := []*devices.Rule{} deviceRules := []*devices.Rule{}
for _, device := range allowedDevices { for _, device := range allowedDevices {
......
...@@ -17,6 +17,9 @@ import ( ...@@ -17,6 +17,9 @@ import (
const ACCEPT_PINFO = "net.ipv6.conf.eth0.accept_ra_pinfo" const ACCEPT_PINFO = "net.ipv6.conf.eth0.accept_ra_pinfo"
const ACCEPT_DFTRTR = "net.ipv6.conf.eth0.accept_ra_defrtr" const ACCEPT_DFTRTR = "net.ipv6.conf.eth0.accept_ra_defrtr"
// Setup networking inside the container
// This must be called from outside the container, since it requires both access to the
// host networking stack and the namespace networking stack
func (c *Container) SetupNetworking(master string, ip net.IPNet, gw net.IP, dns []net.IP, root string) error { func (c *Container) SetupNetworking(master string, ip net.IPNet, gw net.IP, dns []net.IP, root string) error {
ifaceName, err := c.setupIPVlan(master, 1500) ifaceName, err := c.setupIPVlan(master, 1500)
if err != nil { if err != nil {
......
// Simple Memberlist is a wrapper around hashicorp memberlist, that
// leverages the gossip protocol and exposes a simpler interface.
// Main features include: automatic rejoining a list of cluster anchors,
// providing a channel of node changes, encoding and decoding of node
// metadata, plus caching of decoded metadata.
package sml package sml
import ( import (
...@@ -24,7 +29,7 @@ type Memberlist struct { ...@@ -24,7 +29,7 @@ type Memberlist struct {
func New(nodeName string, nodeIP net.IP, port int, anchors []net.IP, key []byte, newMeta func() NodeMeta) *Memberlist { func New(nodeName string, nodeIP net.IP, port int, anchors []net.IP, key []byte, newMeta func() NodeMeta) *Memberlist {
config := memberlist.DefaultLANConfig() config := memberlist.DefaultLANConfig()
config.LogOutput = logrus.StandardLogger().Writer() config.LogOutput = logrus.StandardLogger().WriterLevel(logrus.DebugLevel)
config.SecretKey = key config.SecretKey = key
config.BindAddr = nodeIP.String() config.BindAddr = nodeIP.String()
config.BindPort = port config.BindPort = port
...@@ -51,6 +56,8 @@ func New(nodeName string, nodeIP net.IP, port int, anchors []net.IP, key []byte, ...@@ -51,6 +56,8 @@ func New(nodeName string, nodeIP net.IP, port int, anchors []net.IP, key []byte,
return m return m
} }
// Start the memberlist cluster main loop, that awaits cluster changes, maintains
// the cluster state and propagates information to channels
func (m *Memberlist) Run() error { func (m *Memberlist) Run() error {
// Initialize memberlist // Initialize memberlist
ml, err := memberlist.Create(m.config) ml, err := memberlist.Create(m.config)
...@@ -64,26 +71,31 @@ func (m *Memberlist) Run() error { ...@@ -64,26 +71,31 @@ func (m *Memberlist) Run() error {
case <-ticker: case <-ticker:
go m.join() go m.join()
case <-m.nodeChanges: case <-m.nodeChanges:
logrus.Info("Network topology changed") logrus.Debug("Network topology changed, a node just joined, left or was updated")
for _, node := range m.Nodes() { for _, node := range m.Nodes() {
logrus.Debugf("Node %s: %s ; %v\n", node.Name, string(node.Meta), node.NodeMeta) logrus.Debugf("* Node %s: %s ; %v\n", node.Name, string(node.Meta), node.NodeMeta)
} }
} }
} }
} }
// Get the list of current cluster nodes
func (m *Memberlist) Nodes() []Node { func (m *Memberlist) Nodes() []Node {
return m.nodeCache return m.nodeCache
} }
// Build a local node representation from an upstream memberlist node, using
// empty metadata (before decoding)
func (m *Memberlist) newNode(mlNode *memberlist.Node) Node { func (m *Memberlist) newNode(mlNode *memberlist.Node) Node {
return Node{mlNode, m.newMeta()} return Node{mlNode, m.newMeta()}
} }
// Update the node cache after a network change, goes through all the
// nodes and decodes metadata
func (m *Memberlist) updateCache() { func (m *Memberlist) updateCache() {
// Do not update the node cache until memberlist is setup // Do not update the node cache until memberlist is setup
if m.ml == nil { if m.ml == nil {
logrus.Debug("Not updating the node cache before ml is ready") logrus.Debug("Not updating the node cache before memberlist is ready")
return return
} }
members := m.ml.Members() members := m.ml.Members()
...@@ -97,6 +109,7 @@ func (m *Memberlist) updateCache() { ...@@ -97,6 +109,7 @@ func (m *Memberlist) updateCache() {
m.nodeChanges <- struct{}{} m.nodeChanges <- struct{}{}
} }
// Try and join any anchor that is not currently a cluster member
func (m *Memberlist) join() error { func (m *Memberlist) join() error {
addrs := []string{} addrs := []string{}
for _, candidate := range m.anchors { for _, candidate := range m.anchors {
...@@ -113,7 +126,7 @@ func (m *Memberlist) join() error { ...@@ -113,7 +126,7 @@ func (m *Memberlist) join() error {
addrs = append(addrs, candidate.String()) addrs = append(addrs, candidate.String())
} }
if len(addrs) > 0 { if len(addrs) > 0 {
logrus.Info("Joining nodes: ", addrs) logrus.Info("Joining cluster nodes: ", addrs)
} }
_, err := m.ml.Join(addrs) _, err := m.ml.Join(addrs)
return err return err
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment