Lettuce同步命令源码分析

北京赛车pk10软件计划手机版下载 www.3gt5.cn   Lettuce同步源码分析

    在上一篇分享中分享了单机模式异步连接创建过程Lettuce创建连接过程源码分析; 在本次分享内容主要介绍同步命令的处理过程.

Lettuce是基于Netty的Redis高级客户端,对于异步命令来说是天然的,那么lettuce中是如何处理同步命令的呢?实际上同步连接还是对异步命令的一次封装;下面我们就通过源码进行分析看看Lettuce中的具体实现.

   通过上一篇文章中可以知道在StatefulRedisConnectionImpl中创建 异步模式,同步模式以及响应式模式命令处理模式,那么我们就从 该处看起

    public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {

        super(writer, timeout);

        this.codec = codec;
        //创建异步redis命令处理模式
        this.async = newRedisAsyncCommandsImpl();
        //创建redis命令同步处理模式
        this.sync = newRedisSyncCommandsImpl();
        //创建redis命令响应式处理模式
        this.reactive = newRedisReactiveCommandsImpl();
    }

   通过这里似乎看不出同步处理模式同异步处理模式有什么关联,那么我们在深入进去看一下

protected RedisCommands<K, V> newRedisSyncCommandsImpl() {
        return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
    }

  在这段代码中可以看到async(),这个就是redis命令异步处理模式,那么它是如何封装的呢?

protected <T> T syncHandler(Object asyncApi, Class<?>... interfaces) {
        //对异步API创建调用处理器
        FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection<?, ?>) this, asyncApi, interfaces);
        //创建动态代理
        return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
    }

  通过上面对源码可以发现原来是对异步api创建了一个JDK动态代理;那么关键的逻辑还是在FutureSyncInvocationHandler中,对于动态代理的知识就不在展开了.

在invoke处理是在AbstractInvocationHandler中完成的,它将一些基本公用的抽象在了基类中,将特殊的实现延迟到子类中实现.

 public final Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //如果参数为null则 将args设置为"{}"
        if (args == null) {
            args = NO_ARGS;
        }
        //如果参数长度为0同时方法名称为hashCode则直接返回hashCode
        if (args.length == 0 && method.getName().equals("hashCode")) {
            return hashCode();
        }
        //如果是equals
        if (args.length == 1 && method.getName().equals("equals") && method.getParameterTypes()[0] == Object.class) {
            Object arg = args[0];
            if (arg == null) {
                return false;
            }
            if (proxy == arg) {
                return true;
            }
            return isProxyOfSameInterfaces(arg, proxy.getClass()) && equals(Proxy.getInvocationHandler(arg));
        }
        //如果是toString
        if (args.length == 0 && method.getName().equals("toString")) {
            return toString();
        }
        return handleInvocation(proxy, method, args);
    }

  在FutureSyncInvocationHandler中实现了同步命令处理过程,其源码如下:

 protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {

        try {
            //获取当前method在asyncApi 中对应的方法
            Method targetMethod = this.translator.get(method);
            //调用异步接口
            Object result = targetMethod.invoke(asyncApi, args);
            //如果返回结果是RedisFuture类型
            if (result instanceof RedisFuture<?>) {
               //类型强转
                RedisFuture<?> command = (RedisFuture<?>) result;
                  //如果不是事务控制方法 同时还在事务中则返回null
                if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
                    return null;
                }
                //是事务控制方法,或不在事务中则进行如下处理
                //等待超时或取消
                LettuceFutures.awaitOrCancel(command, connection.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
               //返回结果,这里处理不是很好 上一步中就可以直接返回了
                return command.get();
            }
            //如果不是RedisFuture类型则直接返回
            return result;
        } catch (InvocationTargetException e) {
            throw e.getTargetException();
        }
    }

  在上文中有一段是获取获取指定方法在delegate中对应方法的处理,下面就看看这个处理是如何实现的

/**
     * 方法翻译器
     */
    protected static class MethodTranslator {

        private final static WeakHashMap<Class<?>, MethodTranslator> TRANSLATOR_MAP = new WeakHashMap<>(32);
        
        //真实方法和代理类中方法映射表
        private final Map<Method, Method> map;

        private MethodTranslator(Class<?> delegate, Class<?>... methodSources) {

            map = createMethodMap(delegate, methodSources);
        }

        /**
         * 通过指定代理类,和目标类创建方法翻译器
         */
        public static MethodTranslator of(Class<?> delegate, Class<?>... methodSources) {
            //同步代码块
            synchronized (TRANSLATOR_MAP) {
                //如果翻译器映射表中不存在delegate的翻译器则创建一个新的
                return TRANSLATOR_MAP.computeIfAbsent(delegate, key -> new MethodTranslator(key, methodSources));
            }
        }

        private Map<Method, Method> createMethodMap(Class<?> delegate, Class<?>[] methodSources) {

            Map<Method, Method> map;
            List<Method> methods = new ArrayList<>();
            //遍历源类,找到所有public方法
            for (Class<?> sourceClass : methodSources) {
                methods.addAll(getMethods(sourceClass));
            }

            map = new HashMap<>(methods.size(), 1.0f);

            //创建方法和代理类的方法的映射表
            for (Method method : methods) {

                try {
                    map.put(method, delegate.getMethod(method.getName(), method.getParameterTypes()));
                } catch (NoSuchMethodException ignore) {
                }
            }
            return map;
        }
       //获取目标方法中的所有方法
        private Collection<? extends Method> getMethods(Class<?> sourceClass) {

            //目标方法集合
            Set<Method> result = new HashSet<>();

            Class<?> searchType = sourceClass;
            while (searchType != null && searchType != Object.class) {
                 //将目标类中所有public方法添加到集合中
                result.addAll(filterPublicMethods(Arrays.asList(sourceClass.getDeclaredMethods())));
                //如果souceClass是接口类型
                if (sourceClass.isInterface()) {
                    //获取souceClass的所有接口
                    Class<?>[] interfaces = sourceClass.getInterfaces();
                    //遍历接口,将接口的public方法也添加到方法集合中
                    for (Class<?> interfaceClass : interfaces) {
                        result.addAll(getMethods(interfaceClass));
                    }

                    searchType = null;
                } else {//如果不是接口则查找父类

                    searchType = searchType.getSuperclass();
                }
            }

            return result;
        }

        //获取给定方法集合中所有public方法
        private Collection<? extends Method> filterPublicMethods(List<Method> methods) {
            List<Method> result = new ArrayList<>(methods.size());

            for (Method method : methods) {
                if (Modifier.isPublic(method.getModifiers())) {
                    result.add(method);
                }
            }

            return result;
        }

        public Method get(Method key) {
           //从方法映射表中获取目标方法
            Method result = map.get(key);
            //如果目标方法不为null则返回,否则抛出异常
            if (result != null) {
                return result;
            }
            throw new IllegalStateException("Cannot find source method " + key);
        }
    }
}

  

  

   

posted @ 2018-07-01 11:05 wei_zw 阅读(...) 评论(...) 编辑 收藏
  • 周国平:男女之爱已经很强烈了,但亲子之爱更强烈 2019-05-22
  • 君弘精益精牌讲师投资课 2019-05-22
  • 弹幕评论别降低了审美品位 2019-05-20
  • 人民网评:掌握核心技术,才不会被卡脖子 2019-05-05
  • 燕山谭客.blog的博客—强国博客—人民网 2019-05-04
  • 陕西国防工业职业技术学院百名大学生志愿者敬老院慰问孤寡老人陕西国防工业职业技术学院百名大学生志愿者敬老院慰问-陕西教育新闻 2019-05-04
  • 天津市环境保护突出问题边督边改公开信息 2019-04-29
  • 自作多情。先将台湾收回来再说也不迟啊。 2019-04-25
  • 【世界杯·望俄打卦】突尼斯VS英格兰 2019-04-25
  • 紫光阁中共中央国家机关工作委员会 2019-04-19
  • 学习贯彻落实十九大精神 2019-04-16
  • 全国啦啦操联赛临汾站开幕 2019-03-31
  • 讴歌新时代 讴歌新西藏——国家艺术基金2017年度传播交流推广项目民族音乐会《西藏春天》巡演启动 2019-03-31
  • 【访民情 惠民生 聚民心】吾其村“双膜瓜”为精准脱贫助力 2019-03-29
  • 努比亚Z17(尊享版全网通)参数 2019-03-29
  • 646| 871| 404| 502| 372| 274| 771| 715| 955| 752|