redis实现分布式锁
方法1:普通实现方案
实现方式: 使用指令: set key 随机值 ex 5 nx.意思是当key不存在的时候设置key. 如果key存在返回OK,否则返回nil.
实现过程:
1.执行命令set key true ex 5 nx2.如果成功,执行其他业务逻辑3.使用lua脚本删除key:if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1])else return 0end
为什么要设置过期时间?防止加锁的客户端异常导致删除key的命令没执行,那么就会产生死锁。为什么value要使用随机值?因为设置了超时时间,如果获得锁的线程执行时间过长,导致key自动过期,然后被别的线程获得锁。 所以随机值是为了反正误删别的锁。
未获得锁的处理: 客户端在处理请求时加锁没加成功怎么办。一般有 3 种策略来处理加锁失败:
直接抛出异常,通知用户稍后重试;sleep 一会再重试;将请求转移至延时队列,过一会再试;
总结:
这个方案的弊端:1. 因为如果业务逻辑执行时间过长导致自动过期,别的线程又获得了锁。可能会出现同一把锁被两个线程持有的情况。2. 如果没有获得锁,客户端需要不断重试,影响客户端性能;3. 在哨兵模式下,发生故障转移时,由于主从之间是异步同步,可能获得锁的命令还没有被同步到slave,刚好发生了故障转移,也可能会出现同一把锁被两个线程持有的情况。
方法2:Redlock 算法
Redlock 算法是redis官方支持的分布式锁算法。 目的是就是为了解决集群故障转移时可能发生的同一把锁被两个线程持有的问题。
总之这个算法比较复杂,不建议使用。。。。。
zookeeper实现分布式锁
方法1:多个客户端监听同一个节点
实现思路:
多个客户端同时创建一个相同的临时节点,zk可以保证一定只有一个客户端创建成功,创建成功的就获得了锁。 创建不成功的客户端监听这个节点,如果监听到节点删除事件,那么再次尝试创建节点。
案例:利用分布式锁的思路实现master选举。
/** * 测试leader选举 * * @author leiqian * */public class MasterSelect { private static Logger LOG = LoggerFactory.getLogger(MasterSelect.class); /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.99.100:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 20000;// ms static final String PATH = "/master_select"; static CountDownLatch SEMAPHORE; public static void main(String[] args) throws InterruptedException { // 1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy).build(); // 3 开启连接 cf.start(); // 注册监听器 registerListener(cf); // 开始选举 select(cf); Thread.sleep(Long.MAX_VALUE); } /** * 注册子节点监听器 * * @param cf */ public static void registerListener(CuratorFramework cf) { // 监听子节点 final PathChildrenCache childrenCache = new PathChildrenCache(cf, PATH, false); try { childrenCache.start(StartMode.POST_INITIALIZED_EVENT); } catch (Exception e1) { e1.printStackTrace(); } childrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: // System.out.println("CHILD_ADDED," + event.getData().getPath()); break; case CHILD_UPDATED: // System.out.println("CHILD_UPDATED," + event.getData().getPath()); break; case CHILD_REMOVED: // System.out.println("CHILD_REMOVED," + event.getData().getPath()); // 再次开始参与选举 SEMAPHORE.countDown(); break; default: break; } } }); } /** * 进行选举 * * @param cf * @throws InterruptedException */ public static void select(CuratorFramework cf) throws InterruptedException { SEMAPHORE = new CountDownLatch(1); try { cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/lock"); LOG.info("master选举成功"); work(); removeMaster(cf); LOG.info("等待选举"); SEMAPHORE.await(); LOG.info("下一次选举开始"); select(cf); } catch (Exception e) { LOG.info("master选举失败"); LOG.info("等待选举"); SEMAPHORE.await(); LOG.info("下一次选举开始"); select(cf); } } /** * 移除master * * @param cf */ public static void removeMaster(CuratorFramework cf) { try { cf.delete().forPath(PATH + "/lock"); } catch (Exception e) { e.printStackTrace(); } LOG.info("逻辑完成,master退出"); } /** * 执行业务逻辑 */ public static void work() { LOG.info("开始执行业务逻辑"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }}
注意:curator框架的curator-recipes包中内置了master选举,分布式锁,分布式计数器,分布式barrier的实现,这里的代码只是为了演示思路。
方法2:利用顺序临时节点实现分布式锁
实现思路:
每个客户端创建一个临时顺序节点,然后获取所有的顺序节点,如果当前节点排序后在第一个位置,那么当前client获得锁。 如果不是在第一个位置,监听前一个节点。当前一个删除后,当前client获得锁。
/** * 利用顺序临时节点实现分布式锁 * * @author leiqian * */public class DistributedLock { private static Logger LOG = LoggerFactory.getLogger(MasterSelect.class); private CuratorFramework cf; private String rootPath = "/dirtributed_lock"; private String path = rootPath + "/lock"; private CountDownLatch semaphore; /** 当前节点 */ private String curNode; /** 需要的监听节点 */ private String lockNode; private DistributedLock(String connectStr, int seesionTimeOut) { // 1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通过工厂创建连接 cf = CuratorFrameworkFactory.builder().connectString(connectStr).sessionTimeoutMs(seesionTimeOut) .retryPolicy(retryPolicy).build(); // 3 开启连接 cf.start(); } public static void main(String[] args) throws Exception { // zookeeper地址 String connectStr = "192.168.99.100:2181"; // session超时时间 int seesionTimeOut = 20000;// ms DistributedLock lock = new DistributedLock(connectStr, seesionTimeOut); // 获取分布式锁 lock.acquireLock(); Thread.sleep(Long.MAX_VALUE); } /** * 注册节点监听器 * * @param cf */ public void registerListener(String nodePath) { // 监听节点 NodeCache cache = new NodeCache(cf, nodePath, false); try { cache.start(true); } catch (Exception e) { e.printStackTrace(); } cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { semaphore.countDown(); } }); } private void acquireLock() throws Exception { if (!tryLock()) { LOG.info("等待锁"); waitForLock(); } LOG.info("获得锁.."); work(); releaseLock(); } /** * 尝试获取锁 * * @return * @throws Exception */ private boolean tryLock() throws Exception { this.curNode = cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(this.path); PathAndNode pn = ZKPaths.getPathAndNode(this.curNode); // 获取父节点下面的所有子节点 ListchildNodes = cf.getChildren().forPath(this.rootPath); // 节点排序 Collections.sort(childNodes); if (pn.getNode().equals(childNodes.get(0))) { return true; } else { // 获取前一个node this.lockNode = this.rootPath + "/" + childNodes.get(childNodes.lastIndexOf(pn.getNode()) - 1); return false; } } /** * 等待锁 * * @throws Exception */ private void waitForLock() throws Exception { Stat stat = cf.checkExists().forPath(this.lockNode); if (stat != null) { semaphore = new CountDownLatch(1); // 监听节点 registerListener(lockNode); semaphore.await(); } } /** * 释放锁 */ private void releaseLock() { try { LOG.info("释放锁"); cf.delete().forPath(this.curNode); } catch (Exception e) { e.printStackTrace(); } } /** * 执行业务逻辑 */ private void work() { LOG.info("执行业务逻辑"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }}
redis和zk实现分布式锁的优劣
1.redis分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能;zk分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小2. 如果是redis获取锁的那个客户端bug了或者挂了,那么只能等待超时时间之后才能释放锁;而zk的话,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁总得来说利用zk来实现分布式锁更好更安全。