From 8977dfc9208eab578f75fbee7aa41cc0a5914b9c Mon Sep 17 00:00:00 2001 From: Nikita Tokarchuk Date: Wed, 13 Apr 2022 23:48:51 +0200 Subject: [PATCH] Implement lock and unlock mechanism --- nodesync.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/nodesync.go b/nodesync.go index f9932a0..436338e 100644 --- a/nodesync.go +++ b/nodesync.go @@ -19,6 +19,9 @@ var defaultAcl = zk.WorldACL(zk.PermAll) // The length of a postfix sequence, see http://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming var sequenceLen = 10 +// Name is used by default to create keys +var defaultKey = "sync" + // New creates a new nodesync instance func New(zkconn *zk.Conn, rootPath string) (nodeSync *NodeSync, err error) { @@ -60,4 +63,77 @@ func (s *NodeSync) FetchAndSet(where string, key string, value []byte) (old []by _, err = s.Zk.Set(workingPath, value, ver) return old, err +} + +// Lock creates zookeeper ephemeral key to ensure itself in the node list. +// This is used by the locking mechanism to emulate mutex in a cluster. +func (s *NodeSync) Lock(where string) (iam string, err error) { + + workingPath := path.Join(s.rootPath, where) + + err = s.createRecursively(workingPath, defaultAcl) + if err != nil { + return + } + + created, err := s.Zk.CreateProtectedEphemeralSequential(path.Join(workingPath, defaultKey), nil, defaultAcl) + if err != nil { + return + } + + _, iam = path.Split(created) + + return +} + +// Unlock waits for the previous worker be unlocked, then removes itself +func (s *NodeSync) Unlock(where string, iam string) (err error) { + + workingPath := path.Join(s.rootPath, where) + + children, _, err := s.Zk.Children(workingPath) + if err != nil { + return err + } + if len(children) == 0 { + return + } + + children = sortChildren(children) + + locked := children[0] + if locked == iam { + err = s.Zk.Delete(path.Join(workingPath, iam), 0) + return + } + + for i := 1; i < len(children); i++ { + if children[i] == iam { + break + } + locked = children[i] + } + + for { + var exists bool + var ev <-chan zk.Event + + exists, _, ev, err = s.Zk.ExistsW(path.Join(workingPath, locked)) + if err != nil { + return + } + if !exists { + break + } + + change := <-ev + + if change.Type == zk.EventNodeDeleted { + break + } + } + + err = s.Zk.Delete(path.Join(workingPath, iam), 0) + + return } \ No newline at end of file