博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
分布式锁的实现
阅读量:7006 次
发布时间:2019-06-27

本文共 8106 字,大约阅读时间需要 27 分钟。

hot3.png

redis实现分布式锁

方法1:普通实现方案

实现方式: 使用指令: set key 随机值 ex 5 nx.意思是当key不存在的时候设置key. 如果key存在返回OK,否则返回nil.

实现过程:

1.执行命令set key true ex 5 nx2.如果成功,执行其他业务逻辑3.使用lua脚本删除key:if redis.call("get",KEYS[1]) == ARGV[1] then    return redis.call("del",KEYS[1])else    return 0end
为什么要设置过期时间?防止加锁的客户端异常导致删除key的命令没执行,那么就会产生死锁。为什么value要使用随机值?因为设置了超时时间,如果获得锁的线程执行时间过长,导致key自动过期,然后被别的线程获得锁。 所以随机值是为了反正误删别的锁。

未获得锁的处理: 客户端在处理请求时加锁没加成功怎么办。一般有 3 种策略来处理加锁失败:

直接抛出异常,通知用户稍后重试;sleep 一会再重试;将请求转移至延时队列,过一会再试;

总结:

这个方案的弊端:1. 因为如果业务逻辑执行时间过长导致自动过期,别的线程又获得了锁。可能会出现同一把锁被两个线程持有的情况。2. 如果没有获得锁,客户端需要不断重试,影响客户端性能;3. 在哨兵模式下,发生故障转移时,由于主从之间是异步同步,可能获得锁的命令还没有被同步到slave,刚好发生了故障转移,也可能会出现同一把锁被两个线程持有的情况。

方法2:Redlock 算法

Redlock 算法是redis官方支持的分布式锁算法。 目的是就是为了解决集群故障转移时可能发生的同一把锁被两个线程持有的问题。

总之这个算法比较复杂,不建议使用。。。。。

zookeeper实现分布式锁

方法1:多个客户端监听同一个节点

实现思路:

多个客户端同时创建一个相同的临时节点,zk可以保证一定只有一个客户端创建成功,创建成功的就获得了锁。 创建不成功的客户端监听这个节点,如果监听到节点删除事件,那么再次尝试创建节点。

案例:利用分布式锁的思路实现master选举。

/** * 测试leader选举 *  * @author leiqian * */public class MasterSelect {    private static Logger LOG = LoggerFactory.getLogger(MasterSelect.class);    /** zookeeper地址 */    static final String CONNECT_ADDR = "192.168.99.100:2181";    /** session超时时间 */    static final int SESSION_OUTTIME = 20000;// ms    static final String PATH = "/master_select";    static CountDownLatch SEMAPHORE;    public static void main(String[] args) throws InterruptedException {        // 1 重试策略:初试时间为1s 重试10次        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);        // 2 通过工厂创建连接        CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)                .sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy).build();        // 3 开启连接        cf.start();        // 注册监听器        registerListener(cf);        // 开始选举        select(cf);        Thread.sleep(Long.MAX_VALUE);    }    /**     * 注册子节点监听器     *      * @param cf     */    public static void registerListener(CuratorFramework cf) {        // 监听子节点        final PathChildrenCache childrenCache = new PathChildrenCache(cf, PATH, false);        try {            childrenCache.start(StartMode.POST_INITIALIZED_EVENT);        } catch (Exception e1) {            e1.printStackTrace();        }        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {            @Override            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {                switch (event.getType()) {                case CHILD_ADDED:                    // System.out.println("CHILD_ADDED," + event.getData().getPath());                    break;                case CHILD_UPDATED:                    // System.out.println("CHILD_UPDATED," + event.getData().getPath());                    break;                case CHILD_REMOVED:                    // System.out.println("CHILD_REMOVED," + event.getData().getPath());                    // 再次开始参与选举                    SEMAPHORE.countDown();                    break;                default:                    break;                }            }        });    }    /**     * 进行选举     *      * @param cf     * @throws InterruptedException     */    public static void select(CuratorFramework cf) throws InterruptedException {        SEMAPHORE = new CountDownLatch(1);        try {            cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/lock");            LOG.info("master选举成功");            work();            removeMaster(cf);            LOG.info("等待选举");            SEMAPHORE.await();            LOG.info("下一次选举开始");            select(cf);        } catch (Exception e) {            LOG.info("master选举失败");            LOG.info("等待选举");            SEMAPHORE.await();            LOG.info("下一次选举开始");            select(cf);        }    }    /**     * 移除master     *      * @param cf     */    public static void removeMaster(CuratorFramework cf) {        try {            cf.delete().forPath(PATH + "/lock");        } catch (Exception e) {            e.printStackTrace();        }        LOG.info("逻辑完成,master退出");    }    /**     * 执行业务逻辑     */    public static void work() {        LOG.info("开始执行业务逻辑");        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

注意:curator框架的curator-recipes包中内置了master选举,分布式锁,分布式计数器,分布式barrier的实现,这里的代码只是为了演示思路。

方法2:利用顺序临时节点实现分布式锁

实现思路:

每个客户端创建一个临时顺序节点,然后获取所有的顺序节点,如果当前节点排序后在第一个位置,那么当前client获得锁。 如果不是在第一个位置,监听前一个节点。当前一个删除后,当前client获得锁。
/** * 利用顺序临时节点实现分布式锁 *  * @author leiqian * */public class DistributedLock {    private static Logger LOG = LoggerFactory.getLogger(MasterSelect.class);    private CuratorFramework cf;    private String rootPath = "/dirtributed_lock";    private String path = rootPath + "/lock";    private CountDownLatch semaphore;    /** 当前节点 */    private String curNode;    /** 需要的监听节点 */    private String lockNode;    private DistributedLock(String connectStr, int seesionTimeOut) {        // 1 重试策略:初试时间为1s 重试10次        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);        // 2 通过工厂创建连接        cf = CuratorFrameworkFactory.builder().connectString(connectStr).sessionTimeoutMs(seesionTimeOut)                .retryPolicy(retryPolicy).build();        // 3 开启连接        cf.start();    }    public static void main(String[] args) throws Exception {        // zookeeper地址        String connectStr = "192.168.99.100:2181";        // session超时时间        int seesionTimeOut = 20000;// ms        DistributedLock lock = new DistributedLock(connectStr, seesionTimeOut);        // 获取分布式锁        lock.acquireLock();        Thread.sleep(Long.MAX_VALUE);    }    /**     * 注册节点监听器     *      * @param cf     */    public void registerListener(String nodePath) {        // 监听节点        NodeCache cache = new NodeCache(cf, nodePath, false);        try {            cache.start(true);        } catch (Exception e) {            e.printStackTrace();        }        cache.getListenable().addListener(new NodeCacheListener() {            @Override            public void nodeChanged() throws Exception {                semaphore.countDown();            }        });    }    private void acquireLock() throws Exception {        if (!tryLock()) {            LOG.info("等待锁");            waitForLock();        }        LOG.info("获得锁..");        work();        releaseLock();    }    /**     * 尝试获取锁     *      * @return     * @throws Exception     */    private boolean tryLock() throws Exception {        this.curNode = cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)                .forPath(this.path);        PathAndNode pn = ZKPaths.getPathAndNode(this.curNode);        // 获取父节点下面的所有子节点        List
childNodes = cf.getChildren().forPath(this.rootPath); // 节点排序 Collections.sort(childNodes); if (pn.getNode().equals(childNodes.get(0))) { return true; } else { // 获取前一个node this.lockNode = this.rootPath + "/" + childNodes.get(childNodes.lastIndexOf(pn.getNode()) - 1); return false; } } /** * 等待锁 * * @throws Exception */ private void waitForLock() throws Exception { Stat stat = cf.checkExists().forPath(this.lockNode); if (stat != null) { semaphore = new CountDownLatch(1); // 监听节点 registerListener(lockNode); semaphore.await(); } } /** * 释放锁 */ private void releaseLock() { try { LOG.info("释放锁"); cf.delete().forPath(this.curNode); } catch (Exception e) { e.printStackTrace(); } } /** * 执行业务逻辑 */ private void work() { LOG.info("执行业务逻辑"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }}

redis和zk实现分布式锁的优劣

1.redis分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能;zk分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小2. 如果是redis获取锁的那个客户端bug了或者挂了,那么只能等待超时时间之后才能释放锁;而zk的话,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁总得来说利用zk来实现分布式锁更好更安全。

转载于:https://my.oschina.net/grace233/blog/2885385

你可能感兴趣的文章
复杂而艰辛的重构之路--起步
查看>>
Spring 定时任务之 @Scheduled cron表达式
查看>>
想清楚再入!VR硬件创业能“要你命”
查看>>
Git协作流程(转)
查看>>
web项目部署后heap溢出(jconsole java虚拟机内存管理 tomcat内存管理)
查看>>
No ongoing transaction. Did you forget to call multi?
查看>>
9.6、Libgdx之罗盘
查看>>
JAVA实现AES的加密和解密算法
查看>>
[20170208]关于pre_page_sga参数.txt
查看>>
Ajax请求URL后加随机数原理
查看>>
微信小程序把玩(二)window配置
查看>>
微信公众平台开发(106) 网页获取用户地理位置
查看>>
TEST
查看>>
大数据分析平台解析:什么是Apache Spark?
查看>>
OpenCV stereo matching BM 算法
查看>>
MySQL · 源码分析 · InnoDB LRU List刷脏改进之路
查看>>
Alluxio之定位策略
查看>>
Java和Spring的跨版本升级
查看>>
管理员权限的凭证安全漏洞
查看>>
AI翻译离无障碍交流有多远
查看>>