package sync import ( "path" "github.com/go-zookeeper/zk" ) // NodeSync structure type NodeSync struct { Zk *zk.Conn rootPath string } // Inside private environment keep everything open var defaultAcl = zk.WorldACL(zk.PermAll) // The length of a postfix sequence, see http://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming var sequenceLen = 10 // Name is used by default to create keys var defaultKey = "sync" // New creates a new nodesync instance func New(zkconn *zk.Conn, rootPath string) (nodeSync *NodeSync, err error) { nodeSync = &NodeSync{Zk: zkconn, rootPath: rootPath} err = nodeSync.createRecursively(rootPath, defaultAcl) if err != nil { nodeSync = nil } return } // Fetch returns node data func (s *NodeSync) Fetch(where string, key string) (current []byte, err error) { workingPath := path.Join(s.rootPath, where, key) current, _, err = s.Zk.Get(workingPath) return } // FetchAndSet replaces zookeeper key with new value bytes and returns old if exists func (s *NodeSync) FetchAndSet(where string, key string, value []byte) (old []byte, err error) { workingPath := path.Join(s.rootPath, where, key) err = s.createRecursively(workingPath, defaultAcl) if err != nil { return } var ver int32 = -1 old, stat, err := s.Zk.Get(workingPath) if err == nil { ver = stat.Version } _, err = s.Zk.Set(workingPath, value, ver) return old, err } // Lock creates zookeeper ephemeral key to ensure itself in the node list. // This is used by the locking mechanism to emulate mutex in a cluster. func (s *NodeSync) Lock(where string) (iam string, err error) { workingPath := path.Join(s.rootPath, where) err = s.createRecursively(workingPath, defaultAcl) if err != nil { return } created, err := s.Zk.CreateProtectedEphemeralSequential(path.Join(workingPath, defaultKey), nil, defaultAcl) if err != nil { return } _, iam = path.Split(created) return } // Unlock waits for the previous worker be unlocked, then removes itself func (s *NodeSync) Unlock(where string, iam string) (err error) { workingPath := path.Join(s.rootPath, where) children, _, err := s.Zk.Children(workingPath) if err != nil { return err } if len(children) == 0 { return } children = sortChildren(children) locked := children[0] if locked == iam { err = s.Zk.Delete(path.Join(workingPath, iam), 0) return } for i := 1; i < len(children); i++ { if children[i] == iam { break } locked = children[i] } for { var exists bool var ev <-chan zk.Event exists, _, ev, err = s.Zk.ExistsW(path.Join(workingPath, locked)) if err != nil { return } if !exists { break } change := <-ev if change.Type == zk.EventNodeDeleted { break } } err = s.Zk.Delete(path.Join(workingPath, iam), 0) return } // WaitEmpty implements a barrier synchronisation mechanism by waiting for empty node. func (s *NodeSync) WaitEmpty(where string) (err error) { workingPath := path.Join(s.rootPath, where) for { var children []string var ev <-chan zk.Event children, _, ev, err = s.Zk.ChildrenW(workingPath) if err != nil { return } if len(children) == 0 { break } <-ev } return } // WaitChanged waits for any changes a zookeeper node and returns a new children list after the change. // Note: the children list is always stable sorted. func (s *NodeSync) WaitChanged(where string) (children []string, ev <-chan zk.Event, err error) { workingPath := path.Join(s.rootPath, where) children, _, ev, err = s.Zk.ChildrenW(workingPath) if err != nil { return } children = sortChildren(children) return }