16 June 2019
Leader election is the process of designating a single node as the organizer of some task distributed among several nodes. The leader will be responsible for managing the others and coordinate the actions performed by other nodes. If for any reason the leader fails, other nodes will elect another leader and so on. This can help to ensure that nodes don’t conflict with each other.
Implementing the leader election algorithm with consul is quite easy:
It is just a naive implementation So do NOT use this in production.
Step 1: Download Consul and run on dev mode
$ ./consul agent -dev
Step 2: Register the node as a service on Consul
import (
"github.com/hashicorp/consul/api"
"time"
"fmt"
)
// configs to connect to consul
client, err := api.NewClient(&api.Config{
Address: "127.0.0.1:8500",
Scheme: "http",
})
if err != nil {
panic(err)
}
err = client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Address: "http://127.0.0.1:8080",
ID: "node01_monitoring", // Unique for each node
Name: "monitoring", // Can be service type
Tags: []string{"monitoring"},
Check: &api.AgentServiceCheck{
HTTP: "http://127.0.0.1:8080/_health",
Interval: "10s",
},
})
if err != nil {
panic(err)
}
Step 3: Create a session
sessionID, _, err := client.Session().Create(&api.SessionEntry{
Name: "service/monitoring/leader", // distributed lock
Behavior: "delete",
TTL: "10s",
}, nil)
if err != nil {
panic(err)
}
Step 4: Try to put a Lock on that session. If you succeed you are leader if not… well you are not the leader.
isLeader, _, err := client.KV().Acquire(&api.KVPair{
Key: "service/monitoring/leader", // distributed lock
Value: []byte(sessionID),
Session: sessionID,
}, nil)
if err != nil {
panic(err)
}
fmt.Println(isLeader)
Step 5: Since we defined the time to live for each session (10s). If the leader fails to renew the session for that time, consul will mark the session as invalid and delete or release based on the defined behavior.
doneChan := make(chan struct{})
defer close(doneChan)
go func(){
// RenewPeriodic is used to periodically invoke Session.Renew on a
// session until a doneChan is closed. This is meant to be used in a long running
// goroutine to ensure a session stays valid.
client.Session().RenewPeriodic(
"10s",
sessionID,
nil,
doneChan,
)
}()
time.Sleep(90 * time.Second)
Let’s wrap things up:
package main
import (
"github.com/hashicorp/consul/api"
"time"
"fmt"
)
func main() {
// configs to connect to consul
client, err := api.NewClient(&api.Config{
Address: "127.0.0.1:8500",
Scheme: "http",
})
if err != nil {
panic(err)
}
err = client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Address: "http://127.0.0.1:8080",
ID: "node01_monitoring", // Unique for each node
Name: "monitoring", // Can be service type
Tags: []string{"monitoring"},
Check: &api.AgentServiceCheck{
HTTP: "http://127.0.0.1:8080/_health",
Interval: "10s",
},
})
if err != nil {
panic(err)
}
sessionID, _, err := client.Session().Create(&api.SessionEntry{
Name: "service/monitoring/leader", // distributed lock
Behavior: "delete",
TTL: "10s",
}, nil)
if err != nil {
panic(err)
}
isLeader, _, err := client.KV().Acquire(&api.KVPair{
Key: "service/monitoring/leader", // distributed lock
Value: []byte(sessionID),
Session: sessionID,
}, nil)
if err != nil {
panic(err)
}
fmt.Println(isLeader)
doneChan := make(chan struct{})
defer close(doneChan)
go func(){
// RenewPeriodic is used to periodically invoke Session.Renew on a
// session until a doneChan is closed. This is meant to be used in a long running
// goroutine to ensure a session stays valid.
client.Session().RenewPeriodic(
"10s",
sessionID,
nil,
doneChan,
)
}()
time.Sleep(90 * time.Second)
}
References: