zookeeper基础

12-19 331 views

zookeeper基础原理及介绍路径如下:
做个小实验:让zk维护my.cnf配置文件
  zookeeper 集群通常是用来对用户的分布式应用程序提供协调服务的,为了保证数据的一致性,对 zookeeper 集群进行了这样三种角色划分:leader、follower、observer分别对应着总统、议员和观察者。
先搭建一个zk集群(推荐3台,原因看下面的容错率及防脑裂)
zk1 192.168.0.241
zk2 192.168.0.242
zk3 192.168.0.15
  ①、容错率
  首先从容错率来说明:(需要保证集群能够有半数进行投票)
  2台服务器,至少2台正常运行才行(2的半数为1,半数以上最少为2),正常运行1台服务器都不允许挂掉,但是相对于 单节点服务器,2台服务器还有两个单点故障,所以直接排除了。
  3台服务器,至少2台正常运行才行(3的半数为1.5,半数以上最少为2),正常运行可以允许1台服务器挂掉
  4台服务器,至少3台正常运行才行(4的半数为2,半数以上最少为3),正常运行可以允许1台服务器挂掉
  5台服务器,至少3台正常运行才行(5的半数为2.5,半数以上最少为3),正常运行可以允许2台服务器挂掉
  ②、防脑裂
  脑裂集群的脑裂通常是发生在节点之间通信不可达的情况下,集群会分裂成不同的小集群,小集群各自选出自己的leader节点,导致原有的集群出现多个leader节点的情况,这就是脑裂。
  3台服务器,投票选举半数为1.5,一台服务裂开,和另外两台服务器无法通行,这时候2台服务器的集群(2票大于半数1.5票),所以可以选举出leader,而 1 台服务器的集群无法选举。
  4台服务器,投票选举半数为2,可以分成 1,3两个集群或者2,2两个集群,对于 1,3集群,3集群可以选举;对于2,2集群,则不能选择,造成没有leader节点。
  5台服务器,投票选举半数为2.5,可以分成1,4两个集群,或者2,3两集群,这两个集群分别都只能选举一个集群,满足zookeeper集群搭建数目。
  以上分析,我们从容错率以及防止脑裂两方面说明了3台服务器是搭建集群的最少数目,4台发生脑裂时会造成没有leader节点的错误。
一、安装zk集群
1.0要安装jdk环境。zk依赖于jdk
查看有没有安装jdk
# java -version
没有的话yum安装
# yum install java-1.8.0*
1.1.下载最新zk包到三台机器上/data/zk下面
1.2.解压zk包,并修改其配置文件
# tar xvf apache-zookeeper-3.6.2-bin.tar.gz
# cd /data/zk/apache-zookeeper-3.6.2-bin/conf
# mv zoo_sample.cfg zoo.cfg
1.3 配置文件解析
三台配置文件一样:
 上面红色框住的内容即是我们修改的内容:
  ①、tickTime:基本事件单元,这个时间是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,每隔tickTime时间就会发送一个心跳;最小 的session过期时间为2倍tickTime
  ②、dataDir:存储内存中数据库快照的位置,除非另有说明,否则指向数据库更新的事务日志。注意:应该谨慎的选择日志存放的位置,使用专用的日志存储设备能够大大提高系统的性能,如果将日志存储在比较繁忙的存储设备上,那么将会很大程度上影像系统性能。
  ③、client:监听客户端连接的端口。
  ④、initLimit:允许follower连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
  ⑤、syncLimit:表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
  ⑥、server.A=B:C:D
    A:其中 A 是一个数字,表示这个是服务器的编号;
    B:是这个服务器的 ip 地址;
    C:Leader选举的端口;
    D:Zookeeper服务器之间的通信端口。
  我们需要修改的第一个是 dataDir ,在指定的位置处创建好目录。
  第二个需要新增的是 server.A=B:C:D 配置,其中 A 对应下面我们即将介绍的myid 文件。B是集群的各个IP地址,C:D 是端口配置。
在 上一步 dataDir 指定的目录下,创建 myid 文件。并且写上相应配置数字即可。
第一台
# mkdir /tmp/zookeeper && echo 1 > /tmp/zookeeper/myid
第二台
# mkdir /tmp/zookeeper && echo 2 > /tmp/zookeeper/myid
第三台
# mkdir /tmp/zookeeper && echo 3 > /tmp/zookeeper/myid
1.4修改环境变量
为了能够在任意目录启动zookeeper集群,我们需要配置环境变量。
ps:你也可以不配,这不是搭建集群的必要操作,只不过如果你不配置环境变量,那么每次启动zookeeper需要到安装文件的 bin 目录下去启动。
首先进入到 /etc/profile 目录,添加相应的配置信息(具体路径要看你自己解压的路径位置)
#set zookeeper environment
export ZK_HOME=/data/zk/apache-zookeeper-3.6.2-bin
export PATH=$PATH:$ZK_HOME/bin
使得环境变量生效:
# source /etc/profile
1.5启动zk集群
zkServer.sh [start/stop/restart/status/start-foreground/version/print-cmd] 分别是启动、停止、重启、查看zk集群状态、尝试启动并弹出启动日志、版本、打印出启动参数及命令。
把leader那台的zk停止掉,你会发现,其他两台 follower会有一台变成leader了
集群搭建完毕。。
查看zk日志:
# zkServer.sh print-cmd
可以看出后面out输出到哪里了
这就是日志所在路径
二、Znode节点
Znode具有如下特性:
  • Watches:客户端可以在节点上设置Watches(可以叫做监视器)。当节点状态发生变化时,就会触发监视器对应的操作,当监视器被触发时,ZK服务器会向客户端发送且只发送一个通知
  • 数据访问:ZK上存储的数据需要被原子性的操作(要么修改成功要么回到原样),也是就读操作将会读取节点相关所有数据,写操作也会修改节点相关所有数据,,而且每个节点都有自己的ACL。
节点类型:ZK中有几种节点类型,节点类型在节点创建的时候就被确定且不可改变
  • 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号,分布式锁中会用到该类型节点
  • 持久节点(PERSISTENT):创建后永久存在,除非主动删除。
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):该节点创建后持久存在,相对于持久节点它会在节点名称后面自动增加一个10位数字的序列号,这个计数对于此节点的父节点是唯一,如果这个序列号大于2^32-1就会溢出。
通过 zkCli.sh 进入本地zk命令行界面。若需要连接其他的zk服务需要加上Ip和端口 zkCli.sh -server 192.168.0.15:2181
创建znode节点
create /znode1 持久节点
create -s /znode1 持久顺序节点
create -e /znode1 临时节点
create -e -s /znode1 临时顺序节点
查看znode节点
znode节点的信息
stat /znode1
列出/下面的所有znode节点
ls /
给节点赋值
节点就是key 值就是value
set /znode1 “我是znode1的value”
获取znode节点的值
get /znode1
删除znode节点
delete /znode1
知道了上面这些基础,我们就可以通过python 实现配置文件的同步了。
首先安装zookeeper模块
pip3 install kazoo
代码块:节点创建,修改,删除
创建:
from kazoo.client import KazooClient
def create():
# 连接zk
zk=KazooClient(hosts=”192.168.0.241:2181″)
zk.start()
#将配置文件存入到变量
with open(‘D:\my.cnf’, ‘rb’) as f:
config_file_value=f.read()
# print(config_file_value)
# 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认
zk.create(‘/mysql/my.cnf’, config_file_value, makepath=True)
config_info = zk.get(‘/mysql/my.cnf’)
print(config_info)
# 关闭zk连接
zk.stop()
修改:
from kazoo.client import KazooClient
def modify():
# 连接zk
zk=KazooClient(hosts=”192.168.0.241:2181″)
zk.start()
#将配置文件存入到变量
with open(‘D:\my.cnf’, ‘rb’) as f:
config_file_value=f.read()
# print(config_file_value)
# set修改节点信息
zk.set(‘/mysql/my.cnf’, config_file_value)
config_info = zk.get(‘/mysql/my.cnf’)
print(config_info)
# 关闭zk连接
zk.stop()
删除:
def delete():
# 连接zk
zk=KazooClient(hosts=”192.168.0.241:2181″)
zk.start()
# 参数 recursive:若为 False,当需要删除的节点存在子节点,会抛异常 NotEmptyError 。若为True,则删除 此节点 以及 删除该节点的所有子节点
zk.delete(‘/mysql/my.cnf’,recursive=False)
# 关闭zk连接
zk.stop()
实现配置文件同步逻辑:
首先服务端代码就是修改节点的值
客户端获取节点值,并写入到配置文件
服务端机器代码:
#!/usr/bin/env python3
# encoding: utf-8
“””
@version: Python 3.8.2
@author: ycy
@file: zk-server.py.py
@time: 2020/12/18 16:29
“””
from kazoo.client import KazooClient
def create():
#将配置文件存入到变量
with open(‘D:\my.cnf’, ‘rb’) as f:
config_file_value=f.read()
# print(config_file_value)
# 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认
zk.create(‘/mysql/my.cnf’, config_file_value, makepath=True)
config_info = zk.get(‘/mysql/my.cnf’)
print(config_info)
def modify():
#将配置文件存入到变量
with open(‘D:\my.cnf’, ‘rb’) as f:
config_file_value=f.read()
# print(config_file_value)
# set修改节点信息
zk.set(‘/mysql/my.cnf’, config_file_value)
config_info = zk.get(‘/mysql/my.cnf’)
print(config_info)
def running():
#判断节点是否存在,不不存在直接创建即可,存在则修改即可
stat = zk.exists(“/mysql/my.cnf”)
if stat is None:
print(“/mysql/my.cnf节点不存在,直接创建即可”)
# 创建节点,并将配置文件写入到节点
create()
else:
print(“/mysql/my.cnf节点存在,直接修改即可”)
# 修改节点数据,
modify()
if __name__ == ‘__main__’:
# 连接zk
zk=KazooClient(hosts=”192.168.0.241:2181″)
zk.start()
running()
# 关闭zk连接
zk.stop()
客户端机器代码:
#!/usr/bin/env python3
# encoding: utf-8
“””
@version: Python 3.8.2
@author: ycy
@file: zk-client.py
@time: 2020/12/18 17:14
“””
from kazoo.client import KazooClient
def get_config():
#获取配置文件信息
config_info = zk.get(‘/mysql/my.cnf’)
print(config_info[0])
config_info = str(config_info[0],encoding=’utf-8′)
print(config_info)
#将配置文件信息保存到文件中
with open(‘D:\my.cnf’, ‘w’) as f:
f.write(config_info)
def running():
#判断节点是否存在,不不存在直接抛出错误
stat = zk.exists(“/mysql/my.cnf”)
if stat is None:
print(“/mysql/my.cnf节点不存在!!!,程序退出”)
exit(110)
else:
print(“/mysql/my.cnf节点存在,开始同步配置”)
get_config()
if __name__ == ‘__main__’:
# 连接zk
zk=KazooClient(hosts=”192.168.0.241:2181″)
zk.start()
running()
# 关闭zk连接
zk.stop()
上面是同步配置的案例,实时上zk用于分布式锁的情景会比较适用
通过python实现分布式锁获取:
</div>
<pre>#!/usr/bin/env python3
# encoding: utf-8

"""
@version: Python 3.8.2
@author: ycy
@file: zk-lock.py
@time: 2020/12/19 10:33
"""
import logging,os,time
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock


class zk_lock():
    def __init__(self,hosts,name,logger=None,timeout=2):
        """
        :param hosts:  zk地址
        :param name:  分布式锁名称
        :param logger:  日志对象
        :param timeout:  连接超时时间
        """
        self._client = None
        self._lock = None

        #连接zk并初始化连接
        try:
            self._client = KazooClient(hosts=hosts,logger=logger,timeout=timeout)
            self._client.start(timeout=timeout)
        except Exception as e:
            logging.error('连接zk失败,初始化zk对象失败,错误信息:{}'.format(e))

        #连接成功后创建一个Lock对象资源,用于客户端来获取这个锁资源
        try:
            lock_path = os.path.join("/","locks",name)
            self._lock = Lock(self._client,lock_path)
        except Exception as e:
            logging.error("连接zk成功,但是创建锁对象失败,错误信息:{}".format(e))

    def acquire(self,blocking=True,timeout=None):
        """
        获取锁资源
        :param blocking:
        :param timeout:
        :return:
        """
        if self._lock is None:
            return False
        try:
            return self._lock.acquire(blocking=blocking,timeout=timeout)
        except Exception as e:
            logging.error("获取锁资源失败,错误信息:{}".format(e))
            return False

    def release(self):
        """
        释放锁资源
        :return:
        """
        if self._lock is not None:
            self._lock.release()
            logging.info("锁资源已被释放")


    def __enter__(self):
        """
        上下文管理,与with搭配使用
        :return:
        """
        if not self.acquire():
            raise Exception("获取锁资源失败")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        上下文管理,与with搭配使用
        :param exc_type:
        :param exc_val:
        :param exc_tb:
        :return:
        """
        self.release()

    def __del__(self):
        """
        销毁对象,释放其空间
        :return:
        """
        self.release()
        if self._client:
            #关闭zk连接
            self._client.stop()
            self._client = None

def running():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    sh = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)

    zk_hosts = "192.168.0.241:2181,192.168.0.242:2181,192.168.0.15:2181"
    name = "lock_test"

    lock = zk_lock(zk_hosts,name,logger=logger)
    # with语句的工作原理:
    # 紧跟with后面的语句会被求值,返回对象的__enter__()方法被调用,这个方法的返回值将被赋值给as关键字后面的变量,当with后面的代码块全部被执行完之后,将调用前面返回对象的__exit__()方法。
    #  with语句最关键的地方在于被求值对象必须有__enter__()和__exit__()这两个方法,那我们就可以通过自己实现这两方法来自定义with语句处理异常。
    with lock:
        logging.info('获取锁成功,工作20秒')
        for i in range(1,21):
            time.sleep(1)
            print("工作第{}秒".format(i))

    logging.info('工作完成,锁已被释放')
    lock.release()

if __name__ == '__main__':
    running()

 

可以看到分布式锁是按时序来获取和释放的。
另一个类似的脚本,测试也是可以实现锁功能,比较繁琐的:
</div>
<pre>#!/usr/bin/env python3
# encoding: utf-8

"""
@version: Python 3.8.2
@author: ycy
@file: zk-lock2.py
@time: 2020/12/19 16:58
"""

import logging, os, time
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.recipe.lock import Lock


class ZooKeeperLock():
    def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):
        self.hosts = hosts
        self.id_str = id_str
        self.zk_client = None
        self.timeout = timeout
        self.logger = logger
        self.name = lock_name
        self.lock_handle = None

        self.create_lock()

    def create_lock(self):
        try:
            self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)
            self.zk_client.start(timeout=self.timeout)
        except Exception as ex:
            self.init_ret = False
            self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)
            logging.error(self.err_str)
            return

        try:
            lock_path = os.path.join("/", "locks", self.name)
            self.lock_handle = Lock(self.zk_client, lock_path)
        except Exception as ex:
            self.init_ret = False
            self.err_str = "Create lock failed! Exception: %s" % str(ex)
            logging.error(self.err_str)
            return

    def destroy_lock(self):
        # self.release()

        if self.zk_client != None:
            self.zk_client.stop()
            self.zk_client = None

    def acquire(self, blocking=True, timeout=None):
        if self.lock_handle == None:
            return None

        try:
            return self.lock_handle.acquire(blocking=blocking, timeout=timeout)
        except Exception as ex:
            self.err_str = "Acquire lock failed! Exception: %s" % str(ex)
            logging.error(self.err_str)
            return None

    def release(self):
        if self.lock_handle == None:
            return None
        return self.lock_handle.release()

    def __del__(self):
        self.destroy_lock()


def main():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    sh = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)

    zookeeper_hosts = "192.168.0.241:2181,192.168.0.242:2181,192.168.0.15:2181"
    name = "lock_test"

    lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", name, logger=logger)
    ret = lock.acquire()
    if not ret:
        logging.info("Can't get lock! Ret: %s", ret)
        return

    logging.info("Get lock! Do something! Sleep 10 secs!")
    for i in range(1, 41):
        time.sleep(1)
        print("工作第{}秒".format(i))

    lock.release()


if __name__ == "__main__":
    try:
        main()
    except Exception as ex:
        print
        "Ocurred Exception: %s" % str(ex)
        quit()</pre>

 

有个疑问是:我在win机器的pycharm执行这些代码,zk上并没有锁节点的。。就是说程序并没有创建成功锁资源。。但是linux机器上执行又是可以的。。

nginx 防盗链

一、盗链原理 盗链是一种损害原有网站合法利益,给原网站所在服务器造成额外负担的非法行为。要采取防盗链的措施,首先需要了解盗链的实现原理。 客户端向服...

阅读全文

ssl证书安装

一、证书申请 云服务器上的证书申请,如腾讯云,在云产品->域名管理->ssl证书管理-申请一个免费的证书。如图: 申请完之后,等审核完毕就会发通知到你...

阅读全文

编译安装php

Linux 6 下编译安装 PHP 5.6实例详解 PHP(外文名:PHP: Hypertext Preprocessor,中文名:“超文本预处理器”)是一种通用开源脚本语言。语法吸收了C语言...

阅读全文

欢迎留言