mirror of
https://github.com/mainnika/nodesync.git
synced 2026-05-23 16:23:34 +00:00
Implement lock and unlock mechanism
This commit is contained in:
+76
@@ -19,6 +19,9 @@ var defaultAcl = zk.WorldACL(zk.PermAll)
|
||||
// The length of a postfix sequence, see http://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming
|
||||
var sequenceLen = 10
|
||||
|
||||
// Name is used by default to create keys
|
||||
var defaultKey = "sync"
|
||||
|
||||
// New creates a new nodesync instance
|
||||
func New(zkconn *zk.Conn, rootPath string) (nodeSync *NodeSync, err error) {
|
||||
|
||||
@@ -60,4 +63,77 @@ func (s *NodeSync) FetchAndSet(where string, key string, value []byte) (old []by
|
||||
_, err = s.Zk.Set(workingPath, value, ver)
|
||||
|
||||
return old, err
|
||||
}
|
||||
|
||||
// Lock creates zookeeper ephemeral key to ensure itself in the node list.
|
||||
// This is used by the locking mechanism to emulate mutex in a cluster.
|
||||
func (s *NodeSync) Lock(where string) (iam string, err error) {
|
||||
|
||||
workingPath := path.Join(s.rootPath, where)
|
||||
|
||||
err = s.createRecursively(workingPath, defaultAcl)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
created, err := s.Zk.CreateProtectedEphemeralSequential(path.Join(workingPath, defaultKey), nil, defaultAcl)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, iam = path.Split(created)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Unlock waits for the previous worker be unlocked, then removes itself
|
||||
func (s *NodeSync) Unlock(where string, iam string) (err error) {
|
||||
|
||||
workingPath := path.Join(s.rootPath, where)
|
||||
|
||||
children, _, err := s.Zk.Children(workingPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(children) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
children = sortChildren(children)
|
||||
|
||||
locked := children[0]
|
||||
if locked == iam {
|
||||
err = s.Zk.Delete(path.Join(workingPath, iam), 0)
|
||||
return
|
||||
}
|
||||
|
||||
for i := 1; i < len(children); i++ {
|
||||
if children[i] == iam {
|
||||
break
|
||||
}
|
||||
locked = children[i]
|
||||
}
|
||||
|
||||
for {
|
||||
var exists bool
|
||||
var ev <-chan zk.Event
|
||||
|
||||
exists, _, ev, err = s.Zk.ExistsW(path.Join(workingPath, locked))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !exists {
|
||||
break
|
||||
}
|
||||
|
||||
change := <-ev
|
||||
|
||||
if change.Type == zk.EventNodeDeleted {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
err = s.Zk.Delete(path.Join(workingPath, iam), 0)
|
||||
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user