纯天然绿色学渣
  • <i class="menu-item-icon fa fa-fw fa-home"></i> <br/>首页
  • <i class="menu-item-icon fa fa-fw fa-tags"></i> <br/>标签<span class="badge">28</span>
  • <i class="menu-item-icon fa fa-fw fa-th"></i> <br/>分类<span class="badge">12</span>
  • <i class="menu-item-icon fa fa-fw fa-archive"></i> <br/>归档<span class="badge">42</span>
  • <i class="menu-item-icon fa fa-fw fa-tree"></i> <br/>工具

  • 搜索

spring - DI

发表于 2019-10-22 | 更新于 2019-11-14 | 分类于 Java | 评论数: | 阅读次数:

依赖注入发生的时间

当 Spring IOC 容器完成了 Bean 定义资源的定位、载入和解析注册以后,IOC 容器中已经管理类 Bean
定义的相关数据,但是此时 IOC 容器还没有对所管理的 Bean 进行依赖注入,依赖注入在以下两种情况
发生:

  • 用户第一次通过 getBean 方法向 IOC 容索要 Bean 时,IOC 容器触发依赖注入。
  • 当用户在 Bean 定义资源中为<Bean>元素配置了 lazy-init 属性, 默认是false,即让容器在解析注册 Bean 定义时进行预实例化,触发依赖注入。
    BeanFactory 接口定义了 Spring IOC 容器的基本功能规范,是 Spring IOC 容器所应遵守的最底层和
    最基本的编程规范。BeanFactory 接口中定义了几个 getBean 方法,就是用户向 IOC 容器索取管理的 Bean
    的方法,我们通过分析其子类的具体实现,理解 Spring IOC 容器在用户索取 Bean 时如何完成依赖注
    入。

    在 BeanFactory 中我们看到 getBean(String…)函数,它的具体实现在 AbstractBeanFactory 中

AbstractBeanFactory 通过 getBean 向 IOC 容器获取被管理的 Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
 //获取 IOC 容器中指定名称的 Bean
public Object getBean(String name) throws BeansException {
//doGetBean 才是真正向 IoC 容器获取被管理 Bean 的过程
return doGetBean(name, null, null, false);
}
//获取 IOC 容器中指定名称和类型的 Bean
public <T> T getBean(String name, Class<T> requiredType) throws BeansException {
//doGetBean 才是真正向 IoC 容器获取被管理 Bean 的过程
return doGetBean(name, requiredType, null, false);
}
//获取 IOC 容器中指定名称和参数的 Bean
public Object getBean(String name, Object... args) throws BeansException {
//doGetBean 才是真正向 IoC 容器获取被管理 Bean 的过程
return doGetBean(name, null, args, false);
}
//获取 IOC 容器中指定名称、类型和参数的 Bean
public <T> T getBean(String name, Class<T> requiredType, Object... args) throws BeansException {
//doGetBean 才是真正向 IoC 容器获取被管理 Bean 的过程
return doGetBean(name, requiredType, args, false);
}
//真正实现向 IOC 容器获取 Bean 的功能,也是触发依赖注入功能的地方
@SuppressWarnings("unchecked")
protected <T> T doGetBean(
final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
throws BeansException {
//根据指定的名称获取被管理 Bean 的名称,剥离指定名称中对容器的相关依赖
//如果指定的是别名,将别名转换为规范的 Bean 名称
final String beanName = transformedBeanName(name);
Object bean;
//先从缓存中取是否已经有被创建过的单态类型的 Bean
//对于单例模式的 Bean 整个 IOC 容器中只创建一次,不需要重复创建
Object sharedInstance = getSingleton(beanName);
//IOC 容器创建单例模式 Bean 实例对象
if (sharedInstance != null && args == null) {
if (logger.isDebugEnabled()) {
//如果指定名称的 Bean 在容器中已有单例模式的 Bean 被创建
//直接返回已经创建的 Bean
if (isSingletonCurrentlyInCreation(beanName)) {
logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
"' that is not fully initialized yet - a consequence of a circular reference");
}
else {
logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
}
}
//获取给定 Bean 的实例对象,主要是完成 FactoryBean 的相关处理
//注意:BeanFactory 是管理容器中 Bean 的工厂,而 FactoryBean 是
//创建创建对象的工厂 Bean,两者之间有区别
bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
}
else {
//缓存没有正在创建的单例模式 Bean
//缓存中已经有已经创建的原型模式 Bean
//但是由于循环引用的问题导致实例化对象失败
if (isPrototypeCurrentlyInCreation(beanName)) {
throw new BeanCurrentlyInCreationException(beanName);
}
//对 IOC 容器中是否存在指定名称的 BeanDefinition 进行检查,首先检查是否
//能在当前的 BeanFactory 中获取的所需要的 Bean,如果不能则委托当前容器
//的父级容器去查找,如果还是找不到则沿着容器的继承体系向父级容器查找
// 为什么要委托父容器去找呢?
// ioc容器是可以被关联的:FileSystemXmlApplicationContext(String[] configLocations, ApplicationContext parent)
BeanFactory parentBeanFactory = getParentBeanFactory();
//当前容器的父级容器存在,且当前容器中不存在指定名称的 Bean
if (parentBeanFactory != null && !containsBeanDefinition(beanName)) {
//解析指定 Bean 名称的原始名称
// 因为有可能是通过别名去获取bean
String nameToLookup = originalBeanName(name);
if (args != null) {
//委派父级容器根据指定名称和显式的参数查找
// 为什么不传类型再去校验一次?
// 因为这一步是强转成T , 如果类型不一样, 在后续的调用中会报错ClassCastException
return (T) parentBeanFactory.getBean(nameToLookup, args);
}
else {
//委派父级容器根据指定名称和类型查找
return parentBeanFactory.getBean(nameToLookup, requiredType);
}
}
//如果不是只检查类型,那就标记这个Bean被创建了~~添加到缓存里 也就是所谓的 当前创建Bean池
if (!typeCheckOnly) {
//向容器标记指定的 Bean 已经被创建
markBeanAsCreated(beanName);
}
//根据指定 Bean 名称获取其父级的 Bean 定义
//主要解决 Bean 继承时子类合并父类公共属性问题
final RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);
checkMergedBeanDefinition(mbd, beanName, args);
//获取当前 Bean 所有依赖 Bean 的名称
String[] dependsOn = mbd.getDependsOn();
//如果当前 Bean 有依赖 Bean
if (dependsOn != null) {
for (String dependsOnBean : dependsOn) {
//递归调用 getBean 方法,获取当前 Bean 的依赖 Bean
getBean(dependsOnBean);
//把被依赖 Bean 注册给当前依赖的 Bean
registerDependentBean(dependsOnBean, beanName);
}
}
//创建单例模式 Bean 的实例对象
if (mbd.isSingleton()) {
//这里使用了一个匿名内部类,创建 Bean 实例对象,并且注册给所依赖的对象
sharedInstance = getSingleton(beanName, new ObjectFactory() {
public Object getObject() throws BeansException {
try {
//创建一个指定 Bean 实例对象,如果有父级继承,则合并子类和父类的定义
return createBean(beanName, mbd, args);
}
catch (BeansException ex) {
//显式地从容器单例模式 Bean 缓存中清除实例对象
destroySingleton(beanName);
throw ex;
}
}
});
//获取给定 Bean 的实例对象
bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
}
//IOC 容器创建原型模式 Bean 实例对象
else if (mbd.isPrototype()) {
//原型模式(Prototype)是每次都会创建一个新的对象
Object prototypeInstance = null;
try {
//回调 beforePrototypeCreation 方法,默认的功能是注册当前创//建的原型对象
beforePrototypeCreation(beanName);
//创建指定 Bean 对象实例
prototypeInstance = createBean(beanName, mbd, args);
}
finally {
//回调 afterPrototypeCreation 方法,默认的功能告诉 IoC 容器指定 Bean 的原型对象不再创建了
//就是该bean已经从ing状态变成了ed状态
afterPrototypeCreation(beanName);
}
//获取给定 Bean 的实例对象
bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
}
//要创建的 Bean 既不是单例模式,也不是原型模式,则根据 Bean 定义资源中
//配置的生命周期范围,选择实例化 Bean 的合适方法,这种在 Web 应用程序中
//比较常用,如:request、session、application 等生命周期
else {
String scopeName = mbd.getScope();
//其实scopt也是用map去管理的, 这样的很多地方就能理解通了
//private final Map<String, Scope> scopes = new HashMap<String, Scope>(8);
final Scope scope = this.scopes.get(scopeName);
//Bean 定义资源中没有配置生命周期范围,则 Bean 定义不合法
if (scope == null) {
throw new IllegalStateException("No Scope registered for scope '" + scopeName + "'");
}
try {
//这里又使用了一个匿名内部类,获取一个指定生命周期范围的实例
Object scopedInstance = scope.get(beanName, new ObjectFactory() {
public Object getObject() throws BeansException {
beforePrototypeCreation(beanName);
try {
return createBean(beanName, mbd, args);
}
finally {
afterPrototypeCreation(beanName);
}
}
});
//获取给定 Bean 的实例对象
bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
}
catch (IllegalStateException ex) {
throw new BeanCreationException(beanName,
"Scope '" + scopeName + "' is not active for the current thread; " +
"consider defining a scoped proxy for this bean if you intend to refer to
it from a singleton",
ex);
}
}
}
//对创建的 Bean 实例对象进行类型检查
if (requiredType != null && bean != null && !requiredType.isAssignableFrom(bean.getClass()))
{
throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
}
return (T) bean;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected Object getObjectForBeanInstance(
Object beanInstance, String name, String beanName, RootBeanDefinition mbd) {

// Don't let calling code try to dereference the factory if the bean isn't a factory.
// name 是否是工厂bean标记
// beanInstance 是否继承factoryBean
if (BeanFactoryUtils.isFactoryDereference(name) && !(beanInstance instanceof FactoryBean)) {
throw new BeanIsNotAFactoryException(transformedBeanName(name), beanInstance.getClass());
}

// Now we have the bean instance, which may be a normal bean or a FactoryBean.
// If it's a FactoryBean, we use it to create a bean instance, unless the
// caller actually wants a reference to the factory.
// 1、true || false 普通类 纯的普通类
// 2、false || true 工厂类 纯的工厂
// 3、false || false 普通类 没有定义为工厂的普通工厂类
if (!(beanInstance instanceof FactoryBean) || BeanFactoryUtils.isFactoryDereference(name)) {
return beanInstance;
}

Object object = null;
if (mbd == null) {
object = getCachedObjectForFactoryBean(beanName);
}
if (object == null) {
// Return bean instance from factory.
FactoryBean<?> factory = (FactoryBean<?>) beanInstance;
// Caches object obtained from FactoryBean if it is a singleton.
if (mbd == null && containsBeanDefinition(beanName)) {
mbd = getMergedLocalBeanDefinition(beanName);
}
// 是否是合成的,意思就是是否当做工具类来使用
// 这里的synthetic实际上是为了给用户自定义一些BeanDefinition注册到容器中以当作工具类来使用。
// 什么是synthetic
// 就是内部类,java在编译的时候内部类也会编译成单独的一个文件,那实际上,原始类及时两个类的合成类
// 工具类就没有必要做一些封装、代理等工作,实际上是拿来就用,没有其他处理
// 对于synthetic类型的BeanDefinition,getObjectFromFactoryBean中是不会对FactoryBean生成的bean用post-processor进行后置处理的。
// 后置处理的实现是在AbstractAutowireCapableBeanFactory.postProcessObjectFromFactoryBean中,
// 它会调用容器中的BeanPostProcessor.postProcessAfterInitialization,这里提供了一个扩展点对FactoryBean生成的bean进行封装,代理等
boolean synthetic = (mbd != null && mbd.isSynthetic());
// bean实例化的缓存
// private final Map<String, Object> factoryBeanObjectCache = new ConcurrentHashMap<String, Object>(16);
object = getObjectFromFactoryBean(factory, beanName, !synthetic);
}
return object;
}
1
2
3
4
5
6
//对 FactoryBean 的转义定义,因为如果使用 bean 的名字检索 FactoryBean 得到的对象是工厂生成的对象,
//如果需要得到工厂本身,需要转义
String FACTORY_BEAN_PREFIX = "&";
public static boolean isFactoryDereference(String name) {
return (name != null && name.startsWith(BeanFactory.FACTORY_BEAN_PREFIX));
}
1
2
3
4
5
6
7
private final ThreadLocal<Object> prototypesCurrentlyInCreation =
new NamedThreadLocal<Object>("Prototype beans currently in creation");
protected boolean isPrototypeCurrentlyInCreation(String beanName) {
Object curVal = this.prototypesCurrentlyInCreation.get();
return (curVal != null &&
(curVal.equals(beanName) || (curVal instanceof Set && ((Set<?>) curVal).contains(beanName))));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void beforePrototypeCreation(String beanName) {
Object curVal = this.prototypesCurrentlyInCreation.get();
if (curVal == null) {
this.prototypesCurrentlyInCreation.set(beanName);
}
else if (curVal instanceof String) {
Set<String> beanNameSet = new HashSet<String>(2);
beanNameSet.add((String) curVal);
beanNameSet.add(beanName);
this.prototypesCurrentlyInCreation.set(beanNameSet);
}
else {
Set<String> beanNameSet = (Set<String>) curVal;
beanNameSet.add(beanName);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
protected void afterPrototypeCreation(String beanName) {
Object curVal = this.prototypesCurrentlyInCreation.get();
if (curVal instanceof String) {
this.prototypesCurrentlyInCreation.remove();
}
else if (curVal instanceof Set) {
Set<String> beanNameSet = (Set<String>) curVal;
beanNameSet.remove(beanName);
if (beanNameSet.isEmpty()) {
this.prototypesCurrentlyInCreation.remove();
}
}
}

通过上面对向 IOC 容器获取 Bean 方法的分析,我们可以看到在 Spring 中,如果 Bean 定义的单例模式
(Singleton),则容器在创建之前先从缓存中查找,以确保整个容器中只存在一个实例对象。如果 Bean
定义的是原型模式(Prototype),则容器每次都会创建一个新的实例对象。除此之外,Bean 定义还可以
扩展为指定其生命周期范围。

上面的源码只是定义了根据 Bean 定义的模式,采取的不同创建 Bean 实例对象的策略,具体的 Bean 实
例对象的创建过程由实现了 ObejctFactory 接口的匿名内部类的 createBean 方法完成,ObejctFactory
使用委派模式,具体的 Bean 实例创建过程交由其实现类 AbstractAutowireCapableBeanFactory 完成,
我们继续分析 AbstractAutowireCapableBeanFactory 的 createBean 方法的源码,理解其创建 Bean 实
例的具体实现过程。

循环依赖的产生和解决的前提

  • A的构造方法中依赖了B的实例对象,同时B的构造方法中依赖了A的实例对象
  • A的构造方法中依赖了B的实例对象,同时B的某个field或者setter需要A的实例对象
  • A的某个field或者setter依赖了B的实例对象,同时B的某个field或者setter依赖了A的实例对象
    当然,Spring对于循环依赖的解决不是无条件的,首先前提条件是针对scope单例并且没有显式指明不需要解决循环依赖的对象,而且要求该对象没有被代理过。同时Spring解决循环依赖也不是万能,以上三种情况只能解决两种,第一种在构造方法中相互依赖的情况Spring也无力回天。结论先给在这,下面来看看Spring的解决方法,知道了解决方案就能明白为啥第一种情况无法解决了。
    Spring单例对象的初始化其实可以分为三步:
  • createBeanInstance, 实例化,实际上就是调用对应的构造方法构造对象,此时只是调用了构造方法,spring xml中指定的property并没有进行populate
  • populateBean 填充属性,这步对spring xml中指定的property进行populate
  • initializeBean 调用spring xml中指定的init方法,或者AfterPropertiesSet方法会发生循环依赖的步骤集中在第一步和第二步。

¶三级缓存

对于单例对象来说,在Spring的整个容器的生命周期内,有且只存在一个对象,很容易想到这个对象应该存在Cache中,Spring大量运用了Cache的手段,在循环依赖问题的解决过程中甚至使用了“三级缓存”。
“三级缓存”主要是指

1
2
3
4
5
6
7
8
9
// 完美并且已经在使用的bean
/** Cache of singleton objects: bean name --> bean instance */
private final Map<String, Object> singletonObjects = new ConcurrentHashMap<String, Object>(256);
// bean的工厂方法
/** Cache of early singleton objects: bean name --> bean instance */
private final Map<String, Object> earlySingletonObjects = new HashMap<String, Object>(16);
// 新创建或不完美的bean
/** Cache of singleton factories: bean name --> ObjectFactory */
private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<String, ObjectFactory<?>>(16);

从字面意思来说:singletonObjects指单例对象的cache,singletonFactories指单例对象工厂的cache,earlySingletonObjects指提前曝光的单例对象的cache。以上三个cache构成了三级缓存,Spring就用这三级缓存巧妙的解决了循环依赖问题。
首先Spring会尝试从缓存中获取,这个缓存就是指singletonObjects,主要调用的方法是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected Object getSingleton(String beanName, boolean allowEarlyReference) {
Object singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null && isSingletonCurrentlyInCreation(beanName)) {
synchronized (this.singletonObjects) {
singletonObject = this.earlySingletonObjects.get(beanName);
if (singletonObject == null && allowEarlyReference) {
ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName);
if (singletonFactory != null) {
singletonObject = singletonFactory.getObject();
this.earlySingletonObjects.put(beanName, singletonObject);
this.singletonFactories.remove(beanName);
}
}
}
}
return (singletonObject != NULL_OBJECT ? singletonObject : null);}

首先解释两个参数:

  • isSingletonCurrentlyInCreation 判断对应的单例对象是否在创建中,当单例对象没有被初始化完全(例如A定义的构造函数依赖了B对象,得先去创建B对象,或者在populatebean过程中依赖了B对象,得先去创建B对象,此时A处于创建中)
  • allowEarlyReference 是否允许从singletonFactories中通过getObject拿到对象,其实就是是否允许我重新创建一个

分析getSingleton的整个过程,Spring首先从singletonObjects(一级缓存)中尝试获取,如果获取不到并且对象在创建中,则尝试从earlySingletonObjects(二级缓存)中获取,如果还是获取不到并且允许从singletonFactories通过getObject获取,则通过singletonFactory.getObject()(三级缓存)获取。如果获取到了则

1
2
this.earlySingletonObjects.put(beanName, singletonObject);
this.singletonFactories.remove(beanName);

Spring解决循环依赖的诀窍就在于singletonFactories这个cache,这个cache中存的是类型为ObjectFactory,其定义如下:

1
2
public interface ObjectFactory<T> {
T getObject() throws BeansException;}

在bean创建过程中,有两处比较重要的匿名内部类实现了该接口。一处是

1
2
3
4
5
6
7
8
new ObjectFactory<Object>() {
@Override public Object getObject() throws BeansException {
try {
return createBean(beanName, mbd, args);
} catch (BeansException ex) {
destroySingleton(beanName);
throw ex;
} }

另一处就是:

1
2
3
4
addSingletonFactory(beanName, new ObjectFactory<Object>() {
@Override public Object getObject() throws BeansException {
return getEarlyBeanReference(beanName, mbd, bean);
}});

此处就是解决循环依赖的关键,这段代码发生在createBeanInstance之后,也就是说单例对象此时已经被创建出来的。这个对象已经被生产出来了,虽然还不完美(还没有进行初始化的第二步和第三步),但是已经能被人认出来了(根据对象引用能定位到堆中的对象),所以Spring此时将这个对象提前曝光出来让大家认识,让大家使用。

这样做有什么好处呢?让我们来分析一下“A的某个field或者setter依赖了B的实例对象,同时B的某个field或者setter依赖了A的实例对象”这种循环依赖的情况。A首先完成了初始化的第一步,并且将自己提前曝光到singletonFactories中,此时进行初始化的第二步,发现自己依赖对象B,此时就尝试去get(B),发现B还没有被create,所以走create流程,B在初始化第一步的时候发现自己依赖了对象A,于是尝试get(A),尝试一级缓存singletonObjects(肯定没有,因为A还没初始化完全),尝试二级缓存earlySingletonObjects(也没有),尝试三级缓存singletonFactories,由于A通过ObjectFactory将自己提前曝光了,所以B能够通过ObjectFactory.getObject拿到A对象(虽然A还没有初始化完全,但是总比没有好呀),B拿到A对象后顺利完成了初始化阶段1、2、3,完全初始化之后将自己放入到一级缓存singletonObjects中。此时返回A中,A此时能拿到B的对象顺利完成自己的初始化阶段2、3,最终A也完成了初始化,长大成人,进去了一级缓存singletonObjects中,而且更加幸运的是,由于B拿到了A的对象引用,所以B现在hold住的A对象也蜕变完美了!

知道了这个原理时候,肯定就知道为啥Spring不能解决“A的构造方法中依赖了B的实例对象,同时B的构造方法中依赖了A的实例对象”这类问题了!

AbstractAutowireCapableBeanFactory 创建 Bean 实例对象

AbstractAutowireCapableBeanFactory 类实现了 ObejctFactory 接口,创建容器指定的 Bean 实例对象,
同时还对创建的 Bean 实例对象进行初始化处理。其创建 Bean 实例对象的方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  //创建 Bean 实例对象
protected Object createBean(final String beanName, final RootBeanDefinition mbd, final Object[]
args)
throws BeanCreationException {
if (logger.isDebugEnabled()) {
logger.debug("Creating instance of bean '" + beanName + "'");
}
//判断需要创建的 Bean 是否可以实例化,即是否可以通过当前的类加载器加载
resolveBeanClass(mbd, beanName);
//校验和准备 Bean 中的方法覆盖
//除了原型模式,其他的bean基本上都会使用代理去创建
//默认使用cglib创建代理,好处就是spring有了代理类的控制权,同时也兼容接口代理
try {
mbd.prepareMethodOverrides();
}
catch (BeanDefinitionValidationException ex) {
throw new BeanDefinitionStoreException(mbd.getResourceDescription(),
beanName, "Validation of method overrides failed", ex);
}
try {
//如果 Bean 配置了初始化前和初始化后的处理器,则试图返回一个需要创建 Bean 的代理对象
Object bean = resolveBeforeInstantiation(beanName, mbd);
if (bean != null) {
return bean;
}
}
catch (Throwable ex) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"BeanPostProcessor before instantiation of bean failed", ex);
}
//创建 Bean 的入口
Object beanInstance = doCreateBean(beanName, mbd, args);
if (logger.isDebugEnabled()) {
logger.debug("Finished creating instance of bean '" + beanName + "'");
}
return beanInstance;
}
//真正创建 Bean 的方法
protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[]
args) {
//封装被创建的 Bean 对象
BeanWrapper instanceWrapper = null;
if (mbd.isSingleton()){
//单例模式的 Bean,先从容器中缓存中获取同名 Bean
instanceWrapper = this.factoryBeanInstanceCache.remove(beanName);
}
if (instanceWrapper == null) {
//创建实例对象
//生成 Bean 所包含的 java 对象实例
instanceWrapper = createBeanInstance(beanName, mbd, args);
}
final Object bean = (instanceWrapper != null ? instanceWrapper.getWrappedInstance() : null);
//获取实例化对象的类型
Class beanType = (instanceWrapper != null ? instanceWrapper.getWrappedClass() : null);
//调用 PostProcessor 后置处理器
//就是类似aware监听器一样,监听对象什么时候初始化成功
synchronized (mbd.postProcessingLock) {
if (!mbd.postProcessed) {
applyMergedBeanDefinitionPostProcessors(mbd, beanType, beanName);
mbd.postProcessed = true;
}
}
// Eagerly cache singletons to be able to resolve circular references
//向容器中缓存单例模式的 Bean 对象,以防循环引用
boolean earlySingletonExposure = (mbd.isSingleton() && this.allowCircularReferences &&
isSingletonCurrentlyInCreation(beanName));
//earlySingletonExposure:如果你的bean允许被早期暴露出去 也就是说可以被循环引用 那这里就会进行检查
if (earlySingletonExposure) {
if (logger.isDebugEnabled()) {
logger.debug("Eagerly caching bean '" + beanName +
"' to allow for resolving potential circular references");
}
//这里是一个匿名内部类,为了防止循环引用,尽早持有对象的引用
addSingletonFactory(beanName, new ObjectFactory() {
public Object getObject() throws BeansException {
return getEarlyBeanReference(beanName, mbd, bean);
}
});
}
//Bean 对象的初始化,依赖注入在此触发
//这个 exposedObject 在初始化完成之后返回作为依赖注入完成后的 Bean
Object exposedObject = bean;
try {
//将 Bean 实例对象封装,并且 Bean 定义中配置的属性值赋值给实例对象
//对 Bean 属性的依赖注入进行处理。
populateBean(beanName, mbd, instanceWrapper);
if (exposedObject != null) {
//初始化 Bean 对象
exposedObject = initializeBean(beanName, exposedObject, mbd);
}
}
catch (Throwable ex) {
if (ex instanceof BeanCreationException && beanName.equals(((BeanCreationException)
ex).getBeanName())) {
throw (BeanCreationException) ex;
}
else {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"Initialization of bean failed", ex);
}
}
//此时一级缓存肯定还没数据,但是呢此时候二级缓存earlySingletonObjects也没数据
if (earlySingletonExposure) {
//获取指定名称的已注册的单例模式 Bean 对象
//此时一级缓存肯定还没数据,但是呢此时候二级缓存earlySingletonObjects也没数据
//第二参数为false 表示不会再去三级缓存里查了~~~
Object earlySingletonReference = getSingleton(beanName, false);
if (earlySingletonReference != null) {
//根据名称获取的已注册的 Bean 和正在实例化的 Bean 是同一个
if (exposedObject == bean) {
//当前实例化的 Bean 初始化完成
exposedObject = earlySingletonReference;
}
//当前 Bean 依赖其他 Bean,并且当发生循环引用时不允许新创建实例对象
// allowRawInjectionDespiteWrapping这个值默认是false
// hasDependentBean:若它有依赖的bean 那就需要继续校验了~~~(若没有依赖的 就放过它~)
else if (!this.allowRawInjectionDespiteWrapping && hasDependentBean(beanName)) {
String[] dependentBeans = getDependentBeans(beanName);
Set<String> actualDependentBeans = new
LinkedHashSet<String>(dependentBeans.length);
//获取当前 Bean 所依赖的其他 Bean
// 一个个检查它所以Bean
// removeSingletonIfCreatedForTypeCheckOnly这个放见下面 在AbstractBeanFactory里面
// 简单的说,它如果判断到该dependentBean并没有在创建中的了的情况下,那就把它从所有缓存中移除~~~ 并且返回true
// 否则(比如确实在创建中) 那就返回false 进入我们的if里面~ 表示所谓的真正依赖
//(解释:就是真的需要依赖它先实例化,才能实例化自己的依赖)
for (String dependentBean : dependentBeans) {
//对依赖 Bean 进行类型检查
if (!removeSingletonIfCreatedForTypeCheckOnly(dependentBean)) {
actualDependentBeans.add(dependentBean);
}
}
if (!actualDependentBeans.isEmpty()) {
throw new BeanCurrentlyInCreationException(beanName,
"Bean with name '" + beanName + "' has been injected into other beans
[" +
StringUtils.collectionToCommaDelimitedString(actualDependentBeans) +
"] in its raw version as part of a circular reference, but has eventually
been " +
"wrapped. This means that said other beans do not use the final version
of the " +
"bean. This is often the result of over-eager type matching - consider
using " +
"'getBeanNamesOfType' with the 'allowEagerInit' flag turned off, for
example.");
}
}
}
}
//注册完成依赖注入的 Bean
try {
registerDisposableBeanIfNecessary(beanName, bean, mbd);
}
catch (BeanDefinitionValidationException ex) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName, "Invalid
destruction signature", ex);
}
return exposedObject;
}

1
2
3
4
5
6
7
8
9
10
11
12
/ 虽然是remove方法 但是它的返回值也非常重要
// 该方法唯一调用的地方就是循环依赖的最后检查处
protected boolean removeSingletonIfCreatedForTypeCheckOnly(String beanName) {
// 如果这个bean不在创建中 比如是ForTypeCheckOnly的 那就移除掉
if (!this.alreadyCreated.containsKey(beanName)) {
removeSingleton(beanName);
return true;
}
else {
return false;
}
}

通过对方法源码的分析,我们看到具体的依赖注入实现在以下两个方法中:

  • createBeanInstance:生成 Bean 所包含的 java 对象实例
  • populateBean :对 Bean 属性的依赖注入进行处理。

createBeanInstance 方法创建 Bean 的 java 实例对象

在 createBeanInstance 方法中,根据指定的初始化策略,使用静态工厂、工厂方法或者容器的自动装
配特性生成 java 实例对象,创建对象的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//创建 Bean 的实例对象
protected BeanWrapper createBeanInstance(String beanName, RootBeanDefinition mbd, Object[] args)
{
//检查确认 Bean 是可实例化的
Class beanClass = resolveBeanClass(mbd, beanName);
//使用工厂方法对 Bean 进行实例化
if (beanClass != null && !Modifier.isPublic(beanClass.getModifiers())
&& !mbd.isNonPublicAccessAllowed()) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName,
"Bean class isn't public, and non-public access not allowed: " +
beanClass.getName());
}
if (mbd.getFactoryMethodName() != null) {
//调用工厂方法实例化
return instantiateUsingFactoryMethod(beanName, mbd, args);
}
//使用容器的自动装配方法进行实例化
boolean resolved = false;
boolean autowireNecessary = false;
if (args == null) {
synchronized (mbd.constructorArgumentLock) {
if (mbd.resolvedConstructorOrFactoryMethod != null) {
resolved = true;
autowireNecessary = mbd.constructorArgumentsResolved;
}
}
}
if (resolved) {
if (autowireNecessary) {
//配置了自动装配属性,使用容器的自动装配实例化
//容器的自动装配是根据参数类型匹配 Bean 的构造方法
return autowireConstructor(beanName, mbd, null, null);
}
else {
//使用默认的无参构造方法实例化
return instantiateBean(beanName, mbd);
}
}
//使用 Bean 的构造方法进行实例化
Constructor[] ctors = determineConstructorsFromBeanPostProcessors(beanClass, beanName);
if (ctors != null ||
mbd.getResolvedAutowireMode() == RootBeanDefinition.AUTOWIRE_CONSTRUCTOR ||
mbd.hasConstructorArgumentValues() || !ObjectUtils.isEmpty(args)) {
//使用容器的自动装配特性,调用匹配的构造方法实例化
return autowireConstructor(beanName, mbd, ctors, args);
}
//使用默认的无参构造方法实例化
return instantiateBean(beanName, mbd);
}
//使用默认的无参构造方法实例化 Bean 对象
protected BeanWrapper instantiateBean(final String beanName, final RootBeanDefinition mbd) {
try {
Object beanInstance;
final BeanFactory parent = this;
//获取系统的安全管理接口,JDK 标准的安全管理 API
if (System.getSecurityManager() != null) {
//这里是一个匿名内置类,根据实例化策略创建实例对象
beanInstance = AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
return getInstantiationStrategy().instantiate(mbd, beanName, parent);
}
}, getAccessControlContext());
}
else {
//将实例化的对象封装起来
beanInstance = getInstantiationStrategy().instantiate(mbd, beanName, parent);
}
BeanWrapper bw = new BeanWrapperImpl(beanInstance);
initBeanWrapper(bw);
return bw;
}
catch (Throwable ex) {
throw new BeanCreationException(mbd.getResourceDescription(), beanName, "Instantiation of
bean failed", ex);
}
}

经过对上面的代码分析,我们可以看出,对使用工厂方法和自动装配特性的 Bean 的实例化相当比较清
楚,调用相应的工厂方法或者参数匹配的构造方法即可完成实例化对象的工作,但是对于我们最常使用
的默认无参构造方法就需要使用相应的初始化策略(JDK 的反射机制或者 CGLIB)来进行初始化了,在方
法 getInstantiationStrategy().instantiate 中就具体实现类使用初始策略实例化对象

SimpleInstantiationStrategy 类使用默认的无参构造方法创建 Bean 实例化对象

在使用默认的无参构造方法创建Bean的实例化对象时,方法getInstantiationStrategy().instantiate
调用了 SimpleInstantiationStrategy 类中的实例化 Bean 的方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  //使用初始化策略实例化 Bean 对象
public Object instantiate(RootBeanDefinition beanDefinition, String beanName, BeanFactory owner)
{
//如果 Bean 定义中没有方法覆盖,则就不需要 CGLIB
//那么是什么条件才会触发这个MethodOverrides呢?
//其实是Spring配置文件中的lookup-method和replace-method,
//这其实是两个方法级别的注入,和一般的属性(Property)注入是不一样的,
//它们注入的是方法(Method)。
//两者的差别是这样的
//如果需要替换的方法没有返回值,那么只能使用replace-method来替换,而不能用lookup-method来替换。
//replace-method必须实现MethodReplacer接口的Bean才能替换,而lookup-method则由BeanFactory自动为我们处理了。,
if (beanDefinition.getMethodOverrides().isEmpty()) {
Constructor<?> constructorToUse;
synchronized (beanDefinition.constructorArgumentLock) {
//获取对象的构造方法或工厂方法
constructorToUse = (Constructor<?>)
beanDefinition.resolvedConstructorOrFactoryMethod;
//如果没有构造方法且没有工厂方法
if (constructorToUse == null) {
//使用 JDK 的反射机制,判断要实例化的 Bean 是否是接口
final Class clazz = beanDefinition.getBeanClass();
if (clazz.isInterface()) {
throw new BeanInstantiationException(clazz, "Specified class is an interface");
}
try {
if (System.getSecurityManager() != null) {
//这里是一个匿名内置类,使用反射机制获取 Bean 的构造方法
constructorToUse = AccessController.doPrivileged(new
PrivilegedExceptionAction<Constructor>() {
public Constructor run() throws Exception {
return clazz.getDeclaredConstructor((Class[]) null);
}
});
}
else {
constructorToUse = clazz.getDeclaredConstructor((Class[]) null);
}
beanDefinition.resolvedConstructorOrFactoryMethod = constructorToUse;
}
catch (Exception ex) {
throw new BeanInstantiationException(clazz, "No default constructor found",
ex);
}
}
}
//使用 BeanUtils 实例化,通过反射机制调用”构造方法.newInstance(arg)”来进行实例化
return BeanUtils.instantiateClass(constructorToUse);
}
else {
//使用 CGLIB 来实例化对象
return instantiateWithMethodInjection(beanDefinition, beanName, owner);
}
}

通过上面的代码分析,我们看到了如果 Bean 有方法被覆盖了,则使用 JDK 的反射机制进行实例化,否
则,使用 CGLIB 进行实例化。

instantiateWithMethodInjection 方法调用 SimpleInstantiationStrategy 的子类
CglibSubclassingInstantiationStrategy 使用 CGLIB 来进行初始化,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//使用 CGLIB 进行 Bean 对象实例化
public Object instantiate(Constructor ctor, Object[] args) {
//CGLIB 中的类
Enhancer enhancer = new Enhancer();
//将 Bean 本身作为其基类
enhancer.setSuperclass(this.beanDefinition.getBeanClass());
enhancer.setCallbackFilter(new CallbackFilterImpl());
enhancer.setCallbacks(new Callback[] {
NoOp.INSTANCE,
new LookupOverrideMethodInterceptor(),
new ReplaceOverrideMethodInterceptor()
});
//使用 CGLIB 的 create 方法生成实例对象
return (ctor == null) ?
enhancer.create() :
enhancer.create(ctor.getParameterTypes(), args);
}

CGLIB 是一个常用的字节码生成器的类库,它提供了一系列 API 实现 java 字节码的生成和转换功能。我
们在学习 JDK 的动态代理时都知道,JDK 的动态代理只能针对接口,如果一个类没有实现任何接口,要
对其进行动态代理只能使用 CGLIB。

populateBean 方法对 Bean 属性的依赖注入

在上面的分析中我们已经了解到 Bean 的依赖注入分为以下两个过程

  • createBeanInstance:生成 Bean 所包含的 java 对象实例
  • populateBean :对 Bean 属性的依赖注入进行处理

我们已经分析了容器初始化生成 Bean 所包含的 Java 实例对象的过程,现在我们继续分析
生成对象后,Spring IOC 容器是如何将 Bean 的属性依赖关系注入 Bean 实例对象中并设置好的,属性
依赖注入的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
   //将 Bean 属性设置到生成的实例对象上
protected void populateBean(String beanName, AbstractBeanDefinition mbd, BeanWrapper bw) {
//获取容器在解析 Bean 定义资源时为 BeanDefiniton 中设置的属性值
PropertyValues pvs = mbd.getPropertyValues();
//实例对象为 null
if (bw == null) {
//属性值不为空
if (!pvs.isEmpty()) {
throw new BeanCreationException(
mbd.getResourceDescription(), beanName, "Cannot apply property values to null
instance");
}
else {
//实例对象为 null,属性值也为空,不需要设置属性值,直接返回
return;
}
}
//在设置属性之前调用 Bean 的 PostProcessor 后置处理器
boolean continueWithPropertyPopulation = true;
if (!mbd.isSynthetic() && hasInstantiationAwareBeanPostProcessors()) {
for (BeanPostProcessor bp : getBeanPostProcessors()) {
if (bp instanceof InstantiationAwareBeanPostProcessor) {
InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor)
bp;
if (!ibp.postProcessAfterInstantiation(bw.getWrappedInstance(), beanName)) {
continueWithPropertyPopulation = false;
break;
}
}
}
}
if (!continueWithPropertyPopulation) {
return;
}
//依赖注入开始,首先处理 autowire 自动装配的注入
if (mbd.getResolvedAutowireMode() == RootBeanDefinition.AUTOWIRE_BY_NAME ||
mbd.getResolvedAutowireMode() == RootBeanDefinition.AUTOWIRE_BY_TYPE) {
MutablePropertyValues newPvs = new MutablePropertyValues(pvs);
//对 autowire 自动装配的处理,根据 Bean 名称自动装配注入
if (mbd.getResolvedAutowireMode() == RootBeanDefinition.AUTOWIRE_BY_NAME) {
autowireByName(beanName, mbd, bw, newPvs);
}
//根据 Bean 类型自动装配注入
if (mbd.getResolvedAutowireMode() == RootBeanDefinition.AUTOWIRE_BY_TYPE) {
autowireByType(beanName, mbd, bw, newPvs);
}
pvs = newPvs;
}
//检查容器是否持有用于处理单例模式 Bean 关闭时的后置处理器
boolean hasInstAwareBpps = hasInstantiationAwareBeanPostProcessors();
//Bean 实例对象没有依赖,即没有继承基类
boolean needsDepCheck = (mbd.getDependencyCheck() !=
RootBeanDefinition.DEPENDENCY_CHECK_NONE);
if (hasInstAwareBpps || needsDepCheck) {
//从实例对象中提取属性描述符
PropertyDescriptor[] filteredPds = filterPropertyDescriptorsForDependencyCheck(bw);
if (hasInstAwareBpps) {
for (BeanPostProcessor bp : getBeanPostProcessors()) {
if (bp instanceof InstantiationAwareBeanPostProcessor) {
InstantiationAwareBeanPostProcessor ibp =
(InstantiationAwareBeanPostProcessor) bp;
//使用 BeanPostProcessor 处理器处理属性值
pvs = ibp.postProcessPropertyValues(pvs, filteredPds, bw.getWrappedInstance(),
beanName);
if (pvs == null) {
return;
}
}
}
}
if (needsDepCheck) {
//为要设置的属性进行依赖检查
checkDependencies(beanName, mbd, filteredPds, pvs);
}
}
//对属性进行注入
applyPropertyValues(beanName, mbd, bw, pvs);
}

protected void autowireByName(
String beanName, AbstractBeanDefinition mbd, BeanWrapper bw, MutablePropertyValues pvs) {

String[] propertyNames = unsatisfiedNonSimpleProperties(mbd, bw);
for (String propertyName : propertyNames) {
if (containsBean(propertyName)) {
//使用当前Bean的属性名,在IoC容器中获取对应的bean,让将获取的bean设置为当前的Bean的属性值
Object bean = getBean(propertyName);
pvs.add(propertyName, bean);
registerDependentBean(propertyName, beanName);
if (logger.isDebugEnabled()) {
logger.debug("Added autowiring by name from bean name '" + beanName +
"' via property '" + propertyName + "' to bean named '" + propertyName + "'");
}
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Not autowiring property '" + propertyName + "' of bean '" + beanName +
"' by name: no matching bean found");
}
}
}
}

public void registerDependentBean(String beanName, String dependentBeanName) {
String canonicalName = canonicalName(beanName);
// 翻译一下就是:set里的bean都要依赖 key
synchronized (this.dependentBeanMap) {
Set<String> dependentBeans = this.dependentBeanMap.get(canonicalName);
if (dependentBeans == null) {
dependentBeans = new LinkedHashSet<String>(8);
this.dependentBeanMap.put(canonicalName, dependentBeans);
}
dependentBeans.add(dependentBeanName);
}
// 翻译一下就是:key需要依赖set中的bean
synchronized (this.dependenciesForBeanMap) {
Set<String> dependenciesForBean = this.dependenciesForBeanMap.get(dependentBeanName);
if (dependenciesForBean == null) {
dependenciesForBean = new LinkedHashSet<String>(8);
this.dependenciesForBeanMap.put(dependentBeanName, dependenciesForBean);
}
dependenciesForBean.add(canonicalName);
}
}

protected Map<String, Object> findAutowireCandidates(
String beanName, Class<?> requiredType, DependencyDescriptor descriptor) {

//找一下所有的Bean定义中指定Type的实现类或者子类
String[] candidateNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
this, requiredType, true, descriptor.isEager());
Map<String, Object> result = new LinkedHashMap<String, Object>(candidateNames.length);
//要自动装配的类型是不是要自动装配的纠正类型,这个在
// 非懒加载的单例Bean初始化前后的一些操作,如果要自动装配的类型是纠正类型,
// 比如是一个ResourceLoader,那么就会为该类型生成一个代理实例,
// 具体可以看一下 AutowireUtils.resolveAutowiringValue(autowiringValue, requiredType);
for (Class<?> autowiringType : this.resolvableDependencies.keySet()) {
if (autowiringType.isAssignableFrom(requiredType)) {
Object autowiringValue = this.resolvableDependencies.get(autowiringType);
autowiringValue = AutowireUtils.resolveAutowiringValue(autowiringValue, requiredType);
if (requiredType.isInstance(autowiringValue)) {
result.put(ObjectUtils.identityToString(autowiringValue), autowiringValue);
break;
}
}
}
//逐个判断查找一下beanName对应的BeanDefinition,判断一下是不是自动装配候选者,默认都是的,如果<bean>的autowire-candidate属性设置为false就不是
for (String candidateName : candidateNames) {
if (!candidateName.equals(beanName) && isAutowireCandidate(candidateName, descriptor)) {
result.put(candidateName, getBean(candidateName));
}
}
return result;
}

@Override
public String[] getBeanNamesForType(Class<?> type, boolean includeNonSingletons, boolean allowEagerInit) {
if (!isConfigurationFrozen() || type == null || !allowEagerInit) {
return doGetBeanNamesForType(type, includeNonSingletons, allowEagerInit);
}
Map<Class<?>, String[]> cache =
(includeNonSingletons ? this.allBeanNamesByType : this.singletonBeanNamesByType);
String[] resolvedBeanNames = cache.get(type);
if (resolvedBeanNames != null) {
return resolvedBeanNames;
}
resolvedBeanNames = doGetBeanNamesForType(type, includeNonSingletons, allowEagerInit);
cache.put(type, resolvedBeanNames);
return resolvedBeanNames;
}


//解析并注入依赖属性的过程
protected void applyPropertyValues(String beanName, BeanDefinition mbd, BeanWrapper bw,
PropertyValues pvs) {
if (pvs == null || pvs.isEmpty()) {
return;
}
//封装属性值
MutablePropertyValues mpvs = null;
List<PropertyValue> original;
if (System.getSecurityManager()!= null) {
if (bw instanceof BeanWrapperImpl) {
//设置安全上下文,JDK 安全机制
((BeanWrapperImpl) bw).setSecurityContext(getAccessControlContext());
}
}
if (pvs instanceof MutablePropertyValues) {
mpvs = (MutablePropertyValues) pvs;
//属性值已经转换
if (mpvs.isConverted()) {
try {
//为实例化对象设置属性值
bw.setPropertyValues(mpvs);
return;
}
catch (BeansException ex) {
throw new BeanCreationException(
mbd.getResourceDescription(), beanName, "Error setting property values",
ex);
}
}
//获取属性值对象的原始类型值
original = mpvs.getPropertyValueList();
}
else {
original = Arrays.asList(pvs.getPropertyValues());
}
//获取用户自定义的类型转换
TypeConverter converter = getCustomTypeConverter();
if (converter == null) {
converter = bw;
}
//创建一个 Bean 定义属性值解析器,将 Bean 定义中的属性值解析为 Bean 实例对象的实际值
BeanDefinitionValueResolver valueResolver = new BeanDefinitionValueResolver(this, beanName,
mbd, converter);
//为属性的解析值创建一个拷贝,将拷贝的数据注入到实例对象中
List<PropertyValue> deepCopy = new ArrayList<PropertyValue>(original.size());
boolean resolveNecessary = false;
for (PropertyValue pv : original) {
//属性值不需要转换
if (pv.isConverted()) {
deepCopy.add(pv);
}
//属性值需要转换
else {
String propertyName = pv.getName();
//原始的属性值,即转换之前的属性值
Object originalValue = pv.getValue();
//转换属性值,例如将引用转换为 IOC 容器中实例化对象引用
//检查,接口和类、父类和子类之间的关系是否正确
//同时处理ref类型数据
/**
* <bean class="com.jphoebe.xxx">
* <property name="referBeanName" ref="otherBeanName" />
* </bean>
*/
Object resolvedValue = valueResolver.resolveValueIfNecessary(pv, originalValue);
//转换之后的属性值
Object convertedValue = resolvedValue;
//属性值是否可以转换
boolean convertible = bw.isWritableProperty(propertyName) &&
!PropertyAccessorUtils.isNestedOrIndexedProperty(propertyName);
if (convertible) {
//使用用户自定义的类型转换器转换属性值
convertedValue = convertForProperty(resolvedValue, propertyName, bw, converter);
}
//存储转换后的属性值,避免每次属性注入时的转换工作
if (resolvedValue == originalValue) {
if (convertible) {
//设置属性转换之后的值
pv.setConvertedValue(convertedValue);
}
deepCopy.add(pv);
}
//属性是可转换的,且属性原始值是字符串类型,且属性的原始类型值不是
//动态生成的字符串,且属性的原始值不是集合或者数组类型
else if (convertible && originalValue instanceof TypedStringValue &&
!((TypedStringValue) originalValue).isDynamic() &&
!(convertedValue instanceof Collection ||
ObjectUtils.isArray(convertedValue))) {
pv.setConvertedValue(convertedValue);
deepCopy.add(pv);
}
else {
resolveNecessary = true;
//重新封装属性的值
deepCopy.add(new PropertyValue(pv, convertedValue));
}
}
}
if (mpvs != null && !resolveNecessary) {
//标记属性值已经转换过
mpvs.setConverted();
}
//进行属性依赖注入
try {
bw.setPropertyValues(new MutablePropertyValues(deepCopy));
}
catch (BeansException ex) {
throw new BeanCreationException(
mbd.getResourceDescription(), beanName, "Error setting property values", ex);
}
}

分析上述代码,我们可以看出,对属性的注入过程分以下两种情况:

  • 属性值类型不需要转换时,不需要解析属性值,直接准备进行依赖注入
  • 属性值需要进行类型转换时,如对其他对象的引用等,首先需要解析属性值,然后对解析后的属性值进行依赖注入。

对属性值的解析是在 BeanDefinitionValueResolver 类中的 resolveValueIfNecessary 方法中进行的,
对属性值的依赖注入是通过 bw.setPropertyValues 方法实现的,在分析属性值的依赖注入之前,我们
先分析一下对属性值的解析过程。

BeanDefinitionValueResolver 解析属性值

当容器在对属性进行依赖注入时,如果发现属性值需要进行类型转换,如属性值是容器中另一个 Bean
实例对象的引用,则容器首先需要根据属性值解析出所引用的对象,然后才能将该引用对象注入到目标
实例对象的属性上去,对属性进行解析的由 resolveValueIfNecessary 方法实现,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
//解析属性值,对注入类型进行转换
public Object resolveValueIfNecessary(Object argName, Object value) {
//对引用类型的属性进行解析
if (value instanceof RuntimeBeanReference) {
RuntimeBeanReference ref = (RuntimeBeanReference) value;
//调用引用类型属性的解析方法
return resolveReference(argName, ref);
}
//对属性值是引用容器中另一个 Bean 名称的解析
else if (value instanceof RuntimeBeanNameReference) {
String refName = ((RuntimeBeanNameReference) value).getBeanName();
refName = String.valueOf(evaluate(refName));
//从容器中获取指定名称的 Bean
if (!this.beanFactory.containsBean(refName)) {
throw new BeanDefinitionStoreException(
"Invalid bean name '" + refName + "' in bean reference for " + argName);
}
return refName;
}
//对 Bean 类型属性的解析,主要是 Bean 中的内部类
else if (value instanceof BeanDefinitionHolder) {
BeanDefinitionHolder bdHolder = (BeanDefinitionHolder) value;
return resolveInnerBean(argName, bdHolder.getBeanName(), bdHolder.getBeanDefinition());
}
else if (value instanceof BeanDefinition) {
BeanDefinition bd = (BeanDefinition) value;
return resolveInnerBean(argName, "(inner bean)", bd);
}
//对集合数组类型的属性解析
else if (value instanceof ManagedArray) {
ManagedArray array = (ManagedArray) value;
//获取数组的类型
Class elementType = array.resolvedElementType;
if (elementType == null) {
//获取数组元素的类型
String elementTypeName = array.getElementTypeName();
if (StringUtils.hasText(elementTypeName)) {
try {
//使用反射机制创建指定类型的对象
elementType = ClassUtils.forName(elementTypeName,
this.beanFactory.getBeanClassLoader());
array.resolvedElementType = elementType;
}
catch (Throwable ex) {
throw new BeanCreationException(
this.beanDefinition.getResourceDescription(), this.beanName,
"Error resolving array type for " + argName, ex);
}
}
//没有获取到数组的类型,也没有获取到数组元素的类型
//则直接设置数组的类型为 Object
else {
elementType = Object.class;
}
}
//创建指定类型的数组
return resolveManagedArray(argName, (List<?>) value, elementType);
}
//解析 list 类型的属性值
else if (value instanceof ManagedList) {
return resolveManagedList(argName, (List<?>) value);
}
//解析 set 类型的属性值
else if (value instanceof ManagedSet) {
return resolveManagedSet(argName, (Set<?>) value);
}
//解析 map 类型的属性值
else if (value instanceof ManagedMap) {
return resolveManagedMap(argName, (Map<?, ?>) value);
}
//解析 props 类型的属性值,props 其实就是 key 和 value 均为字符串的 map
else if (value instanceof ManagedProperties) {
Properties original = (Properties) value;
//创建一个拷贝,用于作为解析后的返回值
Properties copy = new Properties();
for (Map.Entry propEntry : original.entrySet()) {
Object propKey = propEntry.getKey();
Object propValue = propEntry.getValue();
if (propKey instanceof TypedStringValue) {
propKey = evaluate((TypedStringValue) propKey);
}
if (propValue instanceof TypedStringValue) {
propValue = evaluate((TypedStringValue) propValue);
}
copy.put(propKey, propValue);
}
return copy;
}
//解析字符串类型的属性值
else if (value instanceof TypedStringValue) {
TypedStringValue typedStringValue = (TypedStringValue) value;
Object valueObject = evaluate(typedStringValue);
try {
//获取属性的目标类型
Class<?> resolvedTargetType = resolveTargetType(typedStringValue);
if (resolvedTargetType != null) {
//对目标类型的属性进行解析,递归调用
return this.typeConverter.convertIfNecessary(valueObject, resolvedTargetType);
}
//没有获取到属性的目标对象,则按 Object 类型返回
else {
return valueObject;
}
}
catch (Throwable ex) {
throw new BeanCreationException(
this.beanDefinition.getResourceDescription(), this.beanName,
"Error converting typed String value for " + argName, ex);
}
}
else {
return evaluate(value);
}
}
//解析引用类型的属性值
private Object resolveReference(Object argName, RuntimeBeanReference ref) {
try {
//获取引用的 Bean 名称
String refName = ref.getBeanName();
refName = String.valueOf(evaluate(refName));
//如果引用的对象在父类容器中,则从父类容器中获取指定的引用对象
if (ref.isToParent()) {
if (this.beanFactory.getParentBeanFactory() == null) {
throw new BeanCreationException(
this.beanDefinition.getResourceDescription(), this.beanName,
"Can't resolve reference to bean '" + refName +
"' in parent factory: no parent factory available");
}
return this.beanFactory.getParentBeanFactory().getBean(refName);
}
//从当前的容器中获取指定的引用 Bean 对象,如果指定的 Bean 没有被实例化
//则会递归触发引用 Bean 的初始化和依赖注入
else {
Object bean = this.beanFactory.getBean(refName);
//将当前实例化对象的依赖引用对象
this.beanFactory.registerDependentBean(refName, this.beanName);
return bean;
}
}
catch (BeansException ex) {
throw new BeanCreationException(
this.beanDefinition.getResourceDescription(), this.beanName,
"Cannot resolve reference to bean '" + ref.getBeanName() + "' while setting " +
argName, ex);
}
}
//解析 array 类型的属性
private Object resolveManagedArray(Object argName, List<?> ml, Class elementType) {
//创建一个指定类型的数组,用于存放和返回解析后的数组
Object resolved = Array.newInstance(elementType, ml.size());
for (int i = 0; i < ml.size(); i++) {
//递归解析 array 的每一个元素,并将解析后的值设置到 resolved 数组中,索引为 i
Array.set(resolved, i,
resolveValueIfNecessary(new KeyedArgName(argName, i), ml.get(i)));
}
return resolved;
}
//解析 list 类型的属性
private List resolveManagedList(Object argName, List<?> ml) {
List<Object> resolved = new ArrayList<Object>(ml.size());
for (int i = 0; i < ml.size(); i++) {
//递归解析 list 的每一个元素
resolved.add(
resolveValueIfNecessary(new KeyedArgName(argName, i), ml.get(i)));
}
return resolved;
}
//解析 set 类型的属性
private Set resolveManagedSet(Object argName, Set<?> ms) {
Set<Object> resolved = new LinkedHashSet<Object>(ms.size());
int i = 0;
//递归解析 set 的每一个元素
for (Object m : ms) {
resolved.add(resolveValueIfNecessary(new KeyedArgName(argName, i), m));
i++;
}
return resolved;
}
//解析 map 类型的属性
private Map resolveManagedMap(Object argName, Map<?, ?> mm) {
Map<Object, Object> resolved = new LinkedHashMap<Object, Object>(mm.size());
//递归解析 map 中每一个元素的 key 和 value
for (Map.Entry entry : mm.entrySet()) {
Object resolvedKey = resolveValueIfNecessary(argName, entry.getKey());
Object resolvedValue = resolveValueIfNecessary(
new KeyedArgName(argName, entry.getKey()), entry.getValue());
resolved.put(resolvedKey, resolvedValue);
}
return resolved;
}

通过上面的代码分析,我们明白了 Spring 是如何将引用类型,内部类以及集合类型等属性进行解析的,
属性值解析完成后就可以进行依赖注入了,依赖注入的过程就是 Bean 对象实例设置到它所依赖的 Bean
对象属性上去,在第 7 步中我们已经说过,依赖注入是通过 bw.setPropertyValues 方法实现的,该方
法也使用了委托模式,在 BeanWrapper 接口中至少定义了方法声明,依赖注入的具体实现交由其实现类
BeanWrapperImpl 来完成,下面我们就分析依 BeanWrapperImpl 中赖注入相关的源码。

BeanWrapperImpl 对 Bean 属性的依赖注入

BeanWrapperImpl 类主要是对容器中完成初始化的 Bean 实例对象进行属性的依赖注入,即把 Bean 对象
设置到它所依赖的另一个 Bean 的属性中去,依赖注入的相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
//实现属性依赖注入功能
private void setPropertyValue(PropertyTokenHolder tokens, PropertyValue pv) throws BeansException {
//PropertyTokenHolder 主要保存属性的名称、路径,以及集合的 size 等信息
String propertyName = tokens.canonicalName;
String actualName = tokens.actualName;
//keys 是用来保存集合类型属性的 size
if (tokens.keys != null) {
//将属性信息拷贝
PropertyTokenHolder getterTokens = new PropertyTokenHolder();
getterTokens.canonicalName = tokens.canonicalName;
getterTokens.actualName = tokens.actualName;
getterTokens.keys = new String[tokens.keys.length - 1];
System.arraycopy(tokens.keys, 0, getterTokens.keys, 0, tokens.keys.length - 1);
Object propValue;
try {
//获取属性值,该方法内部使用 JDK 的内省( Introspector)机制
//调用属性的 getter(readerMethod)方法,获取属性的值
propValue = getPropertyValue(getterTokens);
} catch (NotReadablePropertyException ex) {
throw new NotWritablePropertyException(getRootClass(), this.nestedPath + propertyName,
"Cannot access indexed value in property referenced " +
"in indexed property path '" + propertyName + "'", ex);
}
//获取集合类型属性的长度
String key = tokens.keys[tokens.keys.length - 1];
if (propValue == null) {
throw new NullValueInNestedPathException(getRootClass(), this.nestedPath +
propertyName,
"Cannot access indexed value in property referenced " +
"in indexed property path '" + propertyName + "': returned null");
}
//注入 array 类型的属性值
else if (propValue.getClass().isArray()) {
//获取属性的描述符
PropertyDescriptor pd =
getCachedIntrospectionResults().getPropertyDescriptor(actualName);
//获取数组的类型
Class requiredType = propValue.getClass().getComponentType();
//获取数组的长度
int arrayIndex = Integer.parseInt(key);
Object oldValue = null;
try {
//获取数组以前初始化的值
if (isExtractOldValueForEditor()) {
oldValue = Array.get(propValue, arrayIndex);
}
//将属性的值赋值给数组中的元素
Object convertedValue = convertIfNecessary(propertyName, oldValue, pv.getValue(),
requiredType,
new PropertyTypeDescriptor(pd, new MethodParameter(pd.getReadMethod(),
-1), requiredType));
Array.set(propValue, arrayIndex, convertedValue);
} catch (IndexOutOfBoundsException ex) {
throw new InvalidPropertyException(getRootClass(), this.nestedPath + propertyName,
"Invalid array index in property path '" + propertyName + "'", ex);
}
}
//注入 list 类型的属性值
else if (propValue instanceof List) {
PropertyDescriptor pd =
getCachedIntrospectionResults().getPropertyDescriptor(actualName);
//获取 list 集合的类型
Class requiredType = GenericCollectionTypeResolver.getCollectionReturnType(
pd.getReadMethod(), tokens.keys.length);
List list = (List) propValue;
//获取 list 集合的 size
int index = Integer.parseInt(key);
Object oldValue = null;
if (isExtractOldValueForEditor() && index < list.size()) {
oldValue = list.get(index);
}
//获取 list 解析后的属性值
Object convertedValue = convertIfNecessary(propertyName, oldValue, pv.getValue(),
requiredType,
new PropertyTypeDescriptor(pd, new MethodParameter(pd.getReadMethod(), -1),
requiredType));
if (index < list.size()) {
//为 list 属性赋值
list.set(index, convertedValue);
}
//如果 list 的长度大于属性值的长度,则多余的元素赋值为 null
else if (index >= list.size()) {
for (int i = list.size(); i < index; i++) {
try {
list.add(null);
} catch (NullPointerException ex) {
throw new InvalidPropertyException(getRootClass(), this.nestedPath +
propertyName,
"Cannot set element with index " + index + " in List of size " +
list.size() + ", accessed using property path '" + propertyName +
"': List does not support filling up gaps with null elements");
}
}
list.add(convertedValue);
}
}
//注入 map 类型的属性值
else if (propValue instanceof Map) {
PropertyDescriptor pd =
getCachedIntrospectionResults().getPropertyDescriptor(actualName);
//获取 map 集合 key 的类型
Class mapKeyType = GenericCollectionTypeResolver.getMapKeyReturnType(
pd.getReadMethod(), tokens.keys.length);
//获取 map 集合 value 的类型
Class mapValueType = GenericCollectionTypeResolver.getMapValueReturnType(
pd.getReadMethod(), tokens.keys.length);
Map map = (Map) propValue;
//解析 map 类型属性 key 值
Object convertedMapKey = convertIfNecessary(null, null, key, mapKeyType,
new PropertyTypeDescriptor(pd, new MethodParameter(pd.getReadMethod(), -1),
mapKeyType));
Object oldValue = null;
if (isExtractOldValueForEditor()) {
oldValue = map.get(convertedMapKey);
}
//解析 map 类型属性 value 值
Object convertedMapValue = convertIfNecessary(
propertyName, oldValue, pv.getValue(), mapValueType,
new TypeDescriptor(new MethodParameter(pd.getReadMethod(), -1,
tokens.keys.length + 1)));
//将解析后的 key 和 value 值赋值给 map 集合属性
map.put(convertedMapKey, convertedMapValue);
} else {
throw new InvalidPropertyException(getRootClass(), this.nestedPath + propertyName,
"Property referenced in indexed property path '" + propertyName +
"' is neither an array nor a List nor a Map; returned value was [" + pv.getValue()
+ "]");
}
}
//对非集合类型的属性注入
else {
PropertyDescriptor pd = pv.resolvedDescriptor;
if (pd == null || !pd.getWriteMethod().getDeclaringClass().isInstance(this.object)) {
pd = getCachedIntrospectionResults().getPropertyDescriptor(actualName);
//无法获取到属性名或者属性没有提供 setter(写方法)方法
if (pd == null || pd.getWriteMethod() == null) {
//如果属性值是可选的,即不是必须的,则忽略该属性值
if (pv.isOptional()) {
logger.debug("Ignoring optional value for property '" + actualName +
"' - property not found on bean class [" + getRootClass().getName() +
"]");
return;
}
//如果属性值是必须的,则抛出无法给属性赋值,因为没提供 setter 方法异常
else {
PropertyMatches matches = PropertyMatches.forProperty(propertyName,
getRootClass());
throw new NotWritablePropertyException(
getRootClass(), this.nestedPath + propertyName,
matches.buildErrorMessage(), matches.getPossibleMatches());
}
}
pv.getOriginalPropertyValue().resolvedDescriptor = pd;
}
Object oldValue = null;
try {
Object originalValue = pv.getValue();
Object valueToApply = originalValue;
if (!Boolean.FALSE.equals(pv.conversionNecessary)) {
if (pv.isConverted()) {
valueToApply = pv.getConvertedValue();
} else {
if (isExtractOldValueForEditor() && pd.getReadMethod() != null) {
//获取属性的 getter 方法(读方法),JDK 内省机制
final Method readMethod = pd.getReadMethod();
//如果属性的 getter 方法不是 public 访问控制权限的,即访问控制权限比较严格,
//则使用 JDK 的反射机制强行访问非 public 的方法(暴力读取属性值)
if (!Modifier.isPublic(readMethod.getDeclaringClass().getModifiers()) &&
!readMethod.isAccessible()) {
if (System.getSecurityManager() != null) {
//匿名内部类,根据权限修改属性的读取控制限制
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
readMethod.setAccessible(true);
return null;
}
});
} else {
readMethod.setAccessible(true);
}
}
try {
//属性没有提供 getter 方法时,调用潜在的读取属性值的方法,获取属性值
if (System.getSecurityManager() != null) {
oldValue = AccessController.doPrivileged(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
return readMethod.invoke(object);
}
}, acc);
} else {
oldValue = readMethod.invoke(object);
}
} catch (Exception ex) {
if (ex instanceof PrivilegedActionException) {
ex = ((PrivilegedActionException) ex).getException();
}
if (logger.isDebugEnabled()) {
logger.debug("Could not read previous value of property '" +
this.nestedPath + propertyName + "'", ex);
}
}
}
//设置属性的注入值
valueToApply = convertForProperty(propertyName, oldValue, originalValue, pd);
}
pv.getOriginalPropertyValue().conversionNecessary = (valueToApply !=
originalValue);
}
//根据 JDK 的内省机制,获取属性的 setter(写方法)方法
final Method writeMethod = (pd instanceof GenericTypeAwarePropertyDescriptor ?
((GenericTypeAwarePropertyDescriptor) pd).getWriteMethodForActualAccess() :
pd.getWriteMethod());
//如果属性的 setter 方法是非 public,即访问控制权限比较严格,则使用 JDK 的反射机制,
//强行设置 setter 方法可访问(暴力为属性赋值)
if (!Modifier.isPublic(writeMethod.getDeclaringClass().getModifiers())
&& !writeMethod.isAccessible()) {
//如果使用了 JDK 的安全机制,则需要权限验证
if (System.getSecurityManager() != null) {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
writeMethod.setAccessible(true);
return null;
}
});
} else {
writeMethod.setAccessible(true);
}
}
final Object value = valueToApply;
if (System.getSecurityManager() != null) {
try {
//将属性值设置到属性上去
AccessController.doPrivileged(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception {
writeMethod.invoke(object, value);
return null;
}
}, acc);
} catch (PrivilegedActionException ex) {
throw ex.getException();
}
} else {
writeMethod.invoke(this.object, value);
}
} catch (TypeMismatchException ex) {
throw ex;
} catch (InvocationTargetException ex) {
PropertyChangeEvent propertyChangeEvent =
new PropertyChangeEvent(this.rootObject, this.nestedPath + propertyName,
oldValue, pv.getValue());
if (ex.getTargetException() instanceof ClassCastException) {
throw new TypeMismatchException(propertyChangeEvent, pd.getPropertyType(),
ex.getTargetException());
} else {
throw new MethodInvocationException(propertyChangeEvent,
ex.getTargetException());
}
} catch (Exception ex) {
PropertyChangeEvent pce =
new PropertyChangeEvent(this.rootObject, this.nestedPath + propertyName,
oldValue, pv.getValue());
throw new MethodInvocationException(pce, ex);
}
}
}

通过对上面注入依赖代码的分析,我们已经明白了 Spring IOC 容器是如何将属性的值注入到 Bean 实
例对象中去的:

  • 对于集合类型的属性,将其属性值解析为目标类型的集合后直接赋值给属性。
  • 对于非集合类型的属性,大量使用了 JDK 的反射和内省机制,通过属性的 getter 方法(reader method)获取指定属性注入以前的值,同时调用属性的 setter 方法(writer method)为属性设置注入后的值。看到这里相信很多人都明白了 Spring 的 setter 注入原理。

至此 Spring IOC 容器对 Bean 定义资源文件的定位,载入、解析和依赖注入已经全部分析完毕,现在
Spring IOC 容器中管理了一系列靠依赖关系联系起来的 Bean,程序不需要应用自己手动创建所需的对
象,Spring IOC 容器会在我们使用的时候自动为我们创建,并且为我们注入好相关的依赖,这就是
Spring 核心功能的控制反转和依赖注入的相关功能。

spring - 简介

发表于 2019-10-22 | 更新于 2019-11-06 | 分类于 Java | 评论数: | 阅读次数:

spring

¶轻量级

  • 零配置变成
  • API使用简单

¶面向bean

  • 只需要编写非常普通的bean

¶松耦合

  • 充分利用AOP思想

¶万能胶

  • 与主流框架无缝集成

¶设计模式

  • 将java中进店调的设计模式运用的淋漓尽致

¶简化开发

  • 基于entry的轻量级和最小侵入性变成
  • 通过依赖注入和面向接口松耦合
  • 基于且米娜和惯性进行声明式变成
  • 通过切面和模板减少样版式代码

IOC、IOC容器

业务人员不需要实例化和对对象的管理,只需要让spring知道对象创建的方式,剩下的事情都是spring来做。
等到需要使用它的时候直接拿来用就行了。
ioc容器其实就是存储实例化的bean

¶注入的方式

  • set方法
  • 构造方法
  • 强制赋值

AOP

主要思想是:解耦
过程:先把一个有规律的整体拆了,分别开发,等到发布的时候,再组装起来。

  • 事务 transaction
  • 权限认证 authentication
  • 日志 logging
  • 懒加载 lazy loading
  • 上下文处理 context process
  • 错误跟踪(异常捕获机制) error handler
  • cache 缓存

整体架构


spring 4架构图:


源码编译

官方github

¶不同版本的源码要对应不同的gradle、jdk

文件夹内有相应的README.MD文件,可以查看具体的环境配置


修改Gradlew.bat中相应的版本配置

解压后的源码不被ide工具识别,需要进行编译转换,解压后的文件夹内有相关的转换指导

1
2
3
idea 导入方法,执行以下两个命令
gradlew.bat
gradlew.bat cleanIdea :spring-oxm:compileTestJava

¶3.2.6.RELEASE -> 1.6 gradle / jdk7

¶3.2.6.RELEASE 已编译版

3.2.6.RELEASE 已编译版下载

¶5.0.4.RELEASE 已编译版

5.0.4.RELEASE 已编译版下载

synchronized 和 Lock

发表于 2019-10-22 | 更新于 2019-10-24 | 分类于 Java | 评论数: | 阅读次数:

锁的相关概念介绍

¶可重入锁

如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

看下面这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

public static void main(String[] args) {
final LockTest test = new LockTest();

new Thread(()->{
test.get();
}).start();

new Thread(()->{
test.get();
}).start();

}

public synchronized void get() {
method2();
}

public synchronized void method2() {

}

}

你认为结果会是怎样的呢,会不会造成死锁呢?

上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。
而由于synchronized和Lock都具备可重入性,所以不会发生上述现象

¶可中断锁

可中断锁:顾名思义,就是可以相应中断的锁。
在Java中,synchronized就不是可中断锁,而Lock是可中断锁。
如果某一线程A正在执行锁中的代码,另一线程B正在等待获取该锁,可能由于等待时间过长,线程B不想等待了,想先处理其他事情,我们可以让它中断自己或者在别的线程中中断它,这种就是可中断锁。
在前面演示lockInterruptibly()的用法时已经体现了Lock的可中断性。

¶公平锁

公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。
非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。
而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。
我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:

1
ReentrantLock lock = new ReentrantLock(true);


另外在ReentrantLock类中定义了很多方法,比如:

  • isFair() //判断锁是否是公平锁
  • isLocked() //判断锁是否被任何线程获取了
  • isHeldByCurrentThread() //判断锁是否被当前线程获取了
  • hasQueuedThreads() //判断是否有线程在等待该锁

在ReentrantReadWriteLock中也有类似的方法,同样也可以设置为公平锁和非公平锁。不过要记住,ReentrantReadWriteLock并未实现Lock接口,它实现的是ReadWriteLock接口。

¶读写锁

读写锁将对一个资源(比如文件)的访问分成了2个锁,一个读锁和一个写锁。
正因为有了读写锁,才使得多个线程之间的读操作不会发生冲突。
ReadWriteLock就是读写锁,它是一个接口,ReentrantReadWriteLock实现了这个接口。
可以通过readLock()获取读锁,通过writeLock()获取写锁。

synchronized

synchronized可以保证方法或者代码块在运行时,同一时刻只有一个方法可以进入到临界区,只有一个线程可以执行某个方法或某个代码块,同时synchronized可以保证一个线程的变化可见(可见性),即可以代替volatile,保证共享变量的内存可见性

¶常见的几种应用是:

  • synchronized(this)、synchronize方法
  • synchronized(class)、synchronized静态同步方法
  • synchronized(Object)

¶synchronized(this)、synchronize方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Lock1 class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class SynchronizedTest {

public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
ExecutorService executorService = Executors.newFixedThreadPool(100);

executorService.execute(()->{
try {
synchronizedTest.test();
} catch (Exception e) {
e.printStackTrace();
}
});

// 保证第一个线程先启动并执行
Thread.sleep(1000);

executorService.execute(()->{
try {
synchronizedTest.test2();
} catch (Exception e) {
e.printStackTrace();
}
});
}

public void test() throws InterruptedException {
synchronized (this){
System.out.println("test start");
Thread.sleep(5000);
System.out.println("test end");
}
}
public void test2() {
synchronized (this){
System.out.println("test22222222 start");
System.out.println("test22222222 end");
}
}

}

执行结果

1
2
3
4
test start
test end
test22222222 start
test22222222 end

将test2方法的锁去掉

1
2
3
4
public void test2() {
System.out.println("test22222222 start");
System.out.println("test22222222 end");
}

执行结果

1
2
3
4
test start
test22222222 start
test22222222 end
test end

将test2改成方法锁

1
2
3
4
public synchronized void test2() {
System.out.println("test22222222 start");
System.out.println("test22222222 end");
}

执行结果

1
2
3
4
test start
test end
test22222222 start
test22222222 end

¶总结

上面的结果我们能看到即使 test方法 耗时较长,test2方法 也并不会获得执行的机会,
如果将 test2方法 的同步锁去掉,test2方法 就可以执行
如果将 test2方法 改成方法锁,执行结果和 synchronized(this) 结果相同
so,
synchronized (this)使用的对象监视器该对象自身, 当一个线程访问SynchronizedTest中的一个synchronized (this)同步代码块时,其它线程对同一个SynchronizedTest中的synchronized (this)【包括本方法和其它被修饰的方法】同步代码块的访问将是堵塞,实现了代码顺序的同步执行
synchronized(this) == synchronize方法 【前提是this不是写在其他线程里的,如果是写在其他线程里的,则代表这个线程对象】

¶synchronized(Object)

¶object
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Lock1 class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class SynchronizedTest {

private Object objectLock = new Object();

public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
ExecutorService executorService = Executors.newFixedThreadPool(100);

executorService.execute(()->{
try {
synchronizedTest.test("线程A");
} catch (Exception e) {
e.printStackTrace();
}
});

// 保证第一个线程先启动并执行
Thread.sleep(1000);

executorService.execute(()->{
try {
synchronizedTest.test("线程B");
} catch (Exception e) {
e.printStackTrace();
}
});
}

public void test(String msg) throws InterruptedException {
synchronized (objectLock){
System.out.println(msg + " test start");
Thread.sleep(5000);
System.out.println(msg + " test end");
}
}

}

执行结果

1
2
3
4
线程A test start
线程A test end
线程B test start
线程B test end

将 objectLock 锁对象放入到 方法内

1
2
3
4
5
6
7
8
public void test(String msg) throws InterruptedException {
Object objectLock = new Object();
synchronized (objectLock){
System.out.println(msg + " test start");
Thread.sleep(5000);
System.out.println(msg + " test end");
}
}

执行结果

1
2
3
4
线程A test start
线程B test start
线程A test end
线程B test end

¶总结

object 和 this 本质上是一样的,只不过this是当前对象,而object是我们额外引入的对象

¶synchronized(class)、synchronized静态同步方法

¶class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

public static void main(String[] args) {

LockTest lockTest = new LockTest();
LockTest lockTest2 = new LockTest();

new Thread(()->{
synchronized (LockTest.class){
lockTest.print();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

new Thread(()->{
synchronized (LockTest.class){
lockTest2.print();
}
}).start();

}

public void print(){
System.out.println("父类正在执行");
}

}

执行结果发现 线程2 也是无法执行的,至到 线程1 释放锁

¶静态同步synchronized方法

把 print 方法改成静态锁方法,发现结果和锁class是一样的

¶总结

静态同步synchronized方法 默认锁的是当前的.class对象,
通过实例可以发现 class 锁的当前对象的所有实例,区别于this,this只是锁当前的实例对象
so, synchronized(class) == synchronized静态同步方法

Lock

¶概念

synchronized是java中的一个关键字,也就是说是Java语言内置的特性。那么为什么会出现Lock呢?
如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:

  • 获取锁的线程执行完了该代码块,然后线程释放对锁的占有
  • 线程执行发生异常,此时JVM会让线程自动释放锁。

那么如果这个获取锁的线程由于要等待IO或者其他原因(比如调用sleep方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一下,这多么影响程序执行效率。
因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等待一定的时间或者能够响应中断),通过Lock就可以办到。
再举个例子:当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。
但是采用synchronized关键字来实现同步的话,就会导致一个问题:
如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。
因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。

另外,通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的。
总结一下,也就是说Lock提供了比synchronized更多的功能。但是要注意以下几点:

  • Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问;
  • Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。

首先要说明的就是Lock,通过查看Lock的源码可知,Lock是一个接口:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用来获取锁的
unLock()方法是用来释放锁的

在Lock中声明了四个方法来获取锁,那么这四个方法有何区别呢?
首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待

¶lock()

由于在前面讲到如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。

通常使用Lock来进行同步的话,是以下面这种形式去使用的:

1
2
3
4
5
6
7
8
9
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){

}finally{
lock.unlock(); //释放锁
}
¶tryLock()、tryLock(long time, TimeUnit unit)

tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true

所以,一般情况下通过tryLock来获取锁时是这样使用的:

1
2
3
4
5
6
7
8
9
10
11
12
Lock lock = ...;
if(lock.tryLock()) {
try{
//处理任务
}catch(Exception ex){

}finally{
lock.unlock(); //释放锁
}
}else {
//如果不能获取锁,则直接做其他事情
}
¶lockInterruptibly()

lockInterruptibly()方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。

由于lockInterruptibly()的声明中抛出了异常,所以lock.lockInterruptibly()必须放在try块中或者在调用lockInterruptibly()的方法外声明抛出InterruptedException。

因此lockInterruptibly()一般的使用形式如下:

1
2
3
4
5
6
7
8
9
public void method() throws InterruptedException {
lock.lockInterruptibly();
try {
//.....
}
finally {
lock.unlock();
}
}

注意:
当一个线程获取了锁之后,是不会被interrupt()方法中断的。单独调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。

¶ReentrantLock

ReentrantLock,意思是“可重入锁”,ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。
具体的使用和synchronized 差不多,都需要注意锁的对象,不要把lock对象定义在方法中,导致线程不能共享锁

lock()、tryLock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

private ArrayList<Integer> arrayList = new ArrayList<Integer>();
Lock lock = new ReentrantLock();

public static void main(String[] args) {
final LockTest test = new LockTest();

new Thread(()->{
test.lock(Thread.currentThread());
}).start();

new Thread(()->{
test.lock(Thread.currentThread());
}).start();
}

public void lock(Thread thread) {
lock.lock();
try {
System.out.println(thread.getName()+"得到了锁");

// TODO: 2019/10/22 执行业务逻辑
} catch (Exception e) {
// TODO: handle exception
}finally {
System.out.println(thread.getName()+"释放了锁");
lock.unlock();
}
}

public void tryLock(Thread thread) {
if(lock.tryLock()) {
try {
System.out.println(thread.getName()+"得到了锁");

// TODO: 2019/10/22 执行业务逻辑
} catch (Exception e) {
// TODO: handle exception
}finally {
System.out.println(thread.getName()+"释放了锁");
lock.unlock();
}
} else {
System.out.println(thread.getName()+"获取锁失败");
}
}

}

lockInterruptibly()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

Lock lock = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {
final LockTest test = new LockTest();

Thread thread1 = new Thread(() -> {
try {
test.lockInterruptibly(Thread.currentThread());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
}
});
thread1.start();


Thread thread2 = new Thread(() -> {
try {
test.lockInterruptibly(Thread.currentThread());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
}
});
thread2.start();


Thread.sleep(2000);

thread2.interrupt();

Thread.sleep(10000);
}

public void lockInterruptibly(Thread thread) throws InterruptedException {
//注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出
lock.lockInterruptibly();
try {
System.out.println(thread.getName()+"得到了锁");

Thread.sleep(5000);

// TODO: 2019/10/22 执行业务逻辑
}
finally {
System.out.println(Thread.currentThread().getName()+"执行finally");
lock.unlock();
System.out.println(thread.getName()+"释放了锁");
}
}

}

运行之后,发现thread2能够被正确中断。

¶ReadWriteLock

ReadWriteLock也是一个接口,在它里面只定义了两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading.
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing.
*/
Lock writeLock();
}

一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口。

¶ReentrantReadWriteLock

ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。
假如有多个线程要同时进行读操作的话,先看一下synchronized达到的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

public static void main(String[] args) {
final LockTest test = new LockTest();

new Thread(()->{
test.get(Thread.currentThread());
}).start();

new Thread(()->{
test.get(Thread.currentThread());
}).start();

}

public synchronized void get(Thread thread) {
long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行读操作");
}
System.out.println(thread.getName()+"读操作完毕");
}

}

这段程序的输出结果会是,直到thread1执行完读操作之后,才会打印thread2执行读操作的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-0读操作完毕
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1读操作完毕

而改成用读写锁的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

public static void main(String[] args) {
final LockTest test = new LockTest();

new Thread(()->{
test.get(Thread.currentThread());
}).start();

new Thread(()->{
test.get(Thread.currentThread());
}).start();

}

public void get(Thread thread) {
rwl.readLock().lock();
try {
long start = System.currentTimeMillis();

while(System.currentTimeMillis() - start <= 1) {
System.out.println(thread.getName()+"正在进行读操作");
}
System.out.println(thread.getName()+"读操作完毕");
} finally {
rwl.readLock().unlock();
}
}

}

执行结果

1
2
3
4
5
6
7
Thread-0正在进行读操作
Thread-0正在进行读操作
Thread-1正在进行读操作
Thread-0读操作完毕
Thread-1正在进行读操作
Thread-1正在进行读操作
Thread-1读操作完毕

说明thread1和thread2在同时进行读操作。
这样就大大提升了读操作的效率。
不过要注意的是,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。

¶Condition 类

Condition与重入锁是通过lock.newCondition()方法产生一个与当前重入锁绑定的Condtion实例,我们通知该实例来控制线程的等待与通知。该接口的所有方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public interface Condition {
//当前线程进入等待状态,直到被通知(signal)或者被中断时,当前线程进入运行状态,从await()返回。与Object.wait()类似。
void await() throws InterruptedException;

//当前线程进入等待状态,直到被通知,对中断不做响应;
//线程在调用condition.await()后处于await状态,此时调用thread.interrupt()会报错
//但是使用condition.awaitUninterruptibly()后,调用thread.interrupt()则不会报错
void awaitUninterruptibly();

//nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
//若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;
//增加了超时响应,返回值表示当前剩余的时间,如果在nanosTimeout之前被唤醒,返回值 = nanosTimeout - 实际消耗的时间,返回值 <= 0表示超时;
long awaitNanos(long nanosTimeout) throws InterruptedException;

//与await()基本一致,唯一不同点在于,返回值返回true/false,在time之前被唤醒,返回true,超时返回false。
boolean await(long time, TimeUnit unit) throws InterruptedException;

//适用条件与行为与awaitNanos(long nanosTimeout)完全一样,唯一不同点在于它不是等待指定时间,而是等待由参数指定的某一时刻。
boolean awaitUntil(Date deadline) throws InterruptedException;

//唤醒一个在 await()等待队列中的线程。与Object.notify()相似
void signal();

//唤醒 await()等待队列中所有的线程。与object.notifyAll()相似
void signalAll();
}

¶CountDownLatch

  • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

countDownLatch类中只提供了一个构造器:

1
2
//参数count为计数值
public CountDownLatch(int count) { };

类中有三个方法是最重要的:

1
2
3
4
5
6
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将count值减1
public void countDown() { };

¶CyclicBarrier

1
2
3
4
5
6
//调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法
await()
//broken标识该当前CyclicBarrier是否已经处于中断状态。
//默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程
breakBarrier()
...

dowait(boolean, long)方法,它也是CyclicBarrier的核心方法,该方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 当前代
final Generation g = generation;
// 如果这代损坏了,抛出异常
if (g.broken)
throw new BrokenBarrierException();

// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
// 将损坏状态设置为true
// 并通知其他阻塞在此栅栏上的线程
breakBarrier();
throw new InterruptedException();
}

// 获取下标
int index = --count;
// 如果是 0,说明最后一个线程调用了该方法
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行栅栏任务
if (command != null)
command.run();
ranAction = true;
// 更新一代,将count重置,将generation重置
// 唤醒之前等待的线程
nextGeneration();
return 0;
} finally {
// 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 如果没有时间限制,则直接等待,直到被唤醒
if (!timed)
trip.await();
// 如果有时间限制,则等待指定时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 当前代没有损坏
if (g == generation && ! g.broken) {
// 让栅栏失效
breakBarrier();
throw ie;
} else {
// 上面条件不满足,说明这个线程不是这代的
// 就不会影响当前这代栅栏的执行,所以,就打个中断标记
Thread.currentThread().interrupt();
}
}

// 当有任何一个线程中断了,就会调用breakBarrier方法
// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
if (g.broken)
throw new BrokenBarrierException();

// g != generation表示正常换代了,返回当前线程所在栅栏的下标
// 如果 g == generation,说明还没有换代,那为什么会醒了?
// 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
// 正是因为这个原因,才需要generation来保证正确。
if (g != generation)
return index;

// 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放独占锁
lock.unlock();
}
}

dowait(boolean, long)方法的主要逻辑处理比较简单,如果该线程不是最后一个调用await方法的线程,则它会一直处于等待状态,除非发生以下情况:

  • 最后一个线程到达,即index == 0
  • 某个参与线程等待超时
  • 某个参与线程被中断
  • 调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态
    在上面的源代码中,我们可能需要注意Generation 对象,在上述代码中我们总是可以看到抛出BrokenBarrierException异常,那么什么时候抛出异常呢?如果一个线程处于等待状态时,如果其他线程调用reset(),或者调用的barrier原本就是被损坏的,则抛出BrokenBarrierException异常。同时,任何线程在等待时被中断了,则其他所有线程都将抛出BrokenBarrierException异常,并将barrier置于损坏状态。

同时,Generation描述着CyclicBarrier的更新换代。在CyclicBarrier中,同一批线程属于同一代。当有parties个线程到达barrier之后,generation就会被更新换代。其中broken标识该当前CyclicBarrier是否已经处于中断状态。

1
2
3
private static class Generation {
boolean broken = false;
}

默认barrier是没有损坏的。当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程:

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

当所有线程都已经到达barrier处(index == 0),则会通过nextGeneration()进行更新换地操作,在这个步骤中,做了三件事:唤醒所有线程,重置count,generation:

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

除了上面讲到的栅栏更新换代以及损坏状态,我们在使用CyclicBarrier时还要要注意以下几点:

  • CyclicBarrier使用独占锁来执行await方法,并发性可能不是很高
  • 如果在等待过程中,线程被中断了,就抛出异常。但如果中断的线程所对应的CyclicBarrier不是这代的,比如,在最后一次线程执行signalAll后,并且更新了这个“代”对象。在这个区间,这个线程被中断了,那么,JDK认为任务已经完成了,就不必在乎中断了,只需要打个标记。该部分源码已在dowait(boolean, long)方法中进行了注释。
  • 如果线程被其他的CyclicBarrier唤醒了,那么g肯定等于generation,这个事件就不能return了,而是继续循环阻塞。反之,如果是当前CyclicBarrier唤醒的,就返回线程在CyclicBarrier的下标。完成了一次冲过栅栏的过程。该部分源码已在dowait(boolean, long)方法中进行了注释。

我们自定义的工作线程必须要等所有参与线程开始之后才可以执行,我们可以使用CyclicBarrier类来帮助我们完成。从程序的执行结果中也可以看出,所有的工作线程都运行await()方法之后都到达了栅栏位置,然后,工作线程才开始执行业务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CyclicBarrier {

private static class Generation {
boolean broken = false;
}

// 独占锁成员
private final ReentrantLock lock = new ReentrantLock();
// 条件成员
private final Condition trip = lock.newCondition();
// 必须满足障碍条件的线程个数
private final int parties;
// 当障碍条件满足会被自动执行的任务
private final Runnable barrierCommand;
// 当前世代(cyclicBarrier可重复利用,每一次利用是一个世代)
private Generation generation = new Generation();
// 满足障碍锁条件还需要的阻塞线程个数
private int count;

...
..
.
// 构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 初始化障碍条件总数
this.parties = parties;
// 初始化阻塞线程个数
this.count = parties;
// 初始化条件满足时自动执行任务
this.barrierCommand = barrierAction;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 调用dowait来实现。
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}


private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取独占锁。下面的操作都是同步的。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 设置当前世代
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 由于本线程调用await(),则需要的阻塞线程个数-1。
int index = --count;
// 若为0则代表障碍条件满足
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 执行注册的自动执行任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 设置世代为下一世代,以方便障碍锁的二次利用。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 通过循环
for (;;) {
try {
// 若没有阻塞时间限制,则阻塞
if (!timed)
trip.await();
// 若有阻塞时间限制,则阻塞相应的时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
// 阻塞线程被唤醒后,若generation已被更新则代表障碍条件达成,线程继续执行。
if (g != generation)
return index;
// 阻塞线程若超过了阻塞时间,被唤醒后,则抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放同步锁。
lock.unlock();
}
}


.....
...
..
// 更新当前障碍锁世代。
private void nextGeneration() {
// 唤醒所有在当前障碍锁上阻塞的线程
trip.signalAll();
// reset障碍锁条件
count = parties;
// 初始化新世代
generation = new Generation();
}

¶CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。
  • CountDownLatch : 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。 CyclicBarrier : N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。这样应该就清楚一点了,对于CountDownLatch来说,重点是那个“一个线程”, 是它在等待, 而另外那N的线程在把“某个事
  • 情”做完之后可以继续等待,可以终止。而对于CyclicBarrier来说,重点是那N个线程,他们之间任何一个没有完成,所有的线程都必须等待。

Lock和synchronized的选择

总结来说,Lock和synchronized有以下几点不同:

  • Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
  • synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
  • Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
  • 通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
  • Lock可以提高多个线程进行读操作的效率。
    在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。

分布式架构的演化

发表于 2019-10-22 | 更新于 2019-11-26 | 分类于 Java | 评论数: | 阅读次数:

多线程题目

发表于 2019-10-22 | 更新于 2019-10-24 | 分类于 Java | 评论数: | 阅读次数:

¶实现在一个程序中同时完成如下两个任务,任务一:能将ASCII值为1到100对应的字符输出到控制台;任务二:能将1-100的数以数输出。要求他们交叉输出。

方法一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Test1 class
*
* @author 蒋时华
* @date 2019/10/23
*/
public class Test {

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(100);

Test test = new Test();
executorService.execute(()->{
test.ascll();
});
executorService.execute(()->{
test.number();
});

}

public synchronized void ascll(){
for (int i = 1; i < 100; i++) {
System.out.println((char)i);
notify();
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public synchronized void number(){
for (int i = 1; i < 100; i++) {
System.out.println(i);
notify();
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}
方法二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Test1 class
*
* @author 蒋时华
* @date 2019/10/23
*/
public class Test {

volatile boolean lock = false;

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(100);

Test test = new Test();
executorService.execute(()->{
test.ascll();
});
executorService.execute(()->{
test.number();
});

}

public void ascll(){
for (int i = 1; i < 100; i++) {
while (lock){

}
System.out.println((char)i);
lock = true;
}
}

public void number(){
for (int i = 1; i < 100; i++) {
while (!lock){

}
System.out.println(i);
lock = false;
}
}

}
方法三

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Test1 class
*
* @author 蒋时华
* @date 2019/10/23
*/
public class Test {

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(100);

Test test = new Test();
executorService.execute(()->{
test.ascll();
});
executorService.execute(()->{
test.number();
});

}

public void ascll(){
for (int i = 1; i < 100; i++) {
try{
lock.lock();
System.out.println((char)i);
condition.signal();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public void number(){
for (int i = 1; i < 100; i++) {
try{
lock.lock();
System.out.println(i);
condition.signal();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

}
方法四

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Test1 class
*
* @author 蒋时华
* @date 2019/10/23
*/
public class Test {

// CountDownLatch countDownLatch = new CountDownLatch(1);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);


public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(100);
Test test = new Test();
executorService.execute(()->{
test.ascll();
});
executorService.execute(()->{
test.number();
});
}

public void ascll(){
for (int i = 1; i < 100; i++) {
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println((char)i);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

public void number(){
for (int i = 1; i < 100; i++) {
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(i);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

}
方法五

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
* Test1 class
*
* @author 蒋时华
* @date 2019/10/23
*/
public class Test {

Semaphore semaphore = new Semaphore(1, true);

public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(100);
Test test = new Test();
executorService.execute(()->{
test.ascll();
});
executorService.execute(()->{
test.number();
});
}

public void ascll(){
// 保证另一个线程启动
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i < 100; i++) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println((char)i);

semaphore.release();
}
}

public void number(){
// 保证另一个线程启动
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 1; i < 100; i++) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
semaphore.release();
}
}


}

下一题

锁

发表于 2019-10-22 | 更新于 2019-10-23 | 分类于 Java | 评论数: | 阅读次数:

锁的种类

  • 乐观锁、悲观锁
  • 独享锁、共享锁
  • 互斥锁、读写锁
  • 可重入锁
  • 公平锁、非公平锁
  • 分段锁
  • 自旋锁
  • 偏向锁、轻量级锁、重量级锁

以上是一些锁的名词,这些分类并不是全是指锁的状态,有的指锁的特性,有的指锁的设计,下面总结的内容是对每个锁的名词进行一定的解释。

¶乐观锁、悲观锁

乐观锁与悲观锁并不是特指某两种类型的锁,是人们定义出来的概念或思想,主要是指看待并发同步的角度。

乐观锁:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量,在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS(Compare and Swap 比较并交换)实现的。

悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁。比如Java里面的同步原语synchronized关键字的实现就是悲观锁。

悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景,不加锁会带来大量的性能提升。

悲观锁在Java中的使用,就是利用各种锁。
乐观锁在Java中的使用,是无锁编程,常常采用的是CAS算法,典型的例子就是原子类,通过CAS自旋实现原子操作的更新。

¶乐观锁

乐观锁总是认为不存在并发问题,每次去取数据的时候,总认为不会有其他线程对数据进行修改,因此不会上锁。但是在更新时会判断其他线程在这之前有没有对数据进行修改,一般会使用“数据版本机制”或“CAS操作”来实现。

¶数据版本机制

实现数据版本一般有两种,第一种是使用版本号,第二种是使用时间戳。以版本号方式为例。

版本号方式:一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。
核心SQL代码:

1
update table set xxx=#{xxx}, version=version+1 where id=#{id} and version=#{version};

¶CAS操作

CAS(Compare and Swap 比较并交换),当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。

CAS操作中包含三个操作数——需要读写的内存位置(V)、进行比较的预期原值(A)和拟写入的新值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,否则处理器不做任何操作。

¶悲观锁

悲观锁认为对于同一个数据的并发操作,一定会发生修改的,哪怕没有修改,也会认为修改。因此对于同一份数据的并发操作,悲观锁采取加锁的形式。悲观的认为,不加锁并发操作一定会出问题。

在对任意记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)
如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。具体响应方式由开发者根据实际需要决定。
如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
期间如果有其他对该记录做修改或加排他锁的操作,都会等待我们解锁或直接抛出异常。

¶独享锁、共享锁

独享锁是指该锁一次只能被一个线程所持有。
对于Java ReentrantLock而言,就是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。
读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。
对于Synchronized而言,当然是独享锁。

¶互斥锁、读写锁

上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。
互斥锁在Java中的具体实现就是ReentrantLock。
读写锁在Java中的具体实现就是ReadWriteLock。

¶可重入锁

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。说的有点抽象,下面会有一个代码的示例。
对于Java ReetrantLock而言,从名字就可以看出是一个重入锁,其名字是Re entrant Lock 重新进入锁。
对于Synchronized而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。
如果锁具备可重入性,则称作为可重入锁。像synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

看下面这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* LockTest class
*
* @author 蒋时华
* @date 2017/09/22
*/
public class LockTest {

public static void main(String[] args) {
final LockTest test = new LockTest();

new Thread(()->{
test.get();
}).start();

new Thread(()->{
test.get();
}).start();

}

public synchronized void get() {
method2();
}

public synchronized void method2() {

}

}

你认为结果会是怎样的呢,会不会造成死锁呢?

上述代码中的两个方法method1和method2都用synchronized修饰了,假如某一时刻,线程A执行到了method1,此时线程A获取了这个对象的锁,而由于method2也是synchronized方法,假如synchronized不具备可重入性,此时线程A需要重新申请锁。但是这就会造成一个问题,因为线程A已经持有了该对象的锁,而又在申请获取该对象的锁,这样就会线程A一直等待永远不会获取到的锁。
而由于synchronized和Lock都具备可重入性,所以不会发生上述现象

¶公平锁、非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁。
非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。
对于Java ReetrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。
对于Synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。
公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。

非公平锁即无法保证锁的获取是按照请求锁的顺序进行的。这样就可能导致某个或者一些线程永远获取不到锁。
在Java中,synchronized就是非公平锁,它无法保证等待的线程获取锁的顺序。
而对于ReentrantLock和ReentrantReadWriteLock,它默认情况下是非公平锁,但是可以设置为公平锁。

在ReentrantLock中定义了2个静态内部类,一个是NotFairSync,一个是FairSync,分别用来实现非公平锁和公平锁。
我们可以在创建ReentrantLock对象时,通过以下方式来设置锁的公平性:

1
ReentrantLock lock = new ReentrantLock(true);


另外在ReentrantLock类中定义了很多方法,比如:

  • isFair() //判断锁是否是公平锁
  • isLocked() //判断锁是否被任何线程获取了
  • isHeldByCurrentThread() //判断锁是否被当前线程获取了
  • hasQueuedThreads() //判断是否有线程在等待该锁

在ReentrantReadWriteLock中也有类似的方法,同样也可以设置为公平锁和非公平锁。不过要记住,ReentrantReadWriteLock并未实现Lock接口,它实现的是ReadWriteLock接口。

¶分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作。
我们以ConcurrentHashMap来说一下分段锁的含义以及设计思想,ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7和JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。
当需要put元素的时候,并不是对整个hashmap进行加锁,而是先通过hashcode来知道他要放在哪一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入。
但是,在统计size的时候,可就是获取hashmap全局信息的时候,就需要获取所有的分段锁才能统计。
分段锁的设计目的是细化锁的粒度,当操作不需要更新整个数组的时候,就仅仅针对数组中的一项进行加锁操作。

¶自旋锁

在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。比如 while(true){}

¶偏向锁、轻量级锁、重量级锁

这三种锁是指锁的状态,并且是针对Synchronized。在Java 5通过引入锁升级的机制来实现高效Synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让他申请的线程进入阻塞,性能降低。

储备知识

¶AQS

AbstractQueuedSynchronized 抽象队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…

AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

state的访问方式有三种:

1
2
3
getState()
setState()
compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

1
2
3
4
5
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。

再以CountDownLatch为例,任务分为N个子线程去执行,state为初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

¶CAS

CAS(Compare and Swap 比较并交换)是乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。
CAS操作中包含三个操作数——需要读写的内存位置(V)、进行比较的预期原值(A)和拟写入的新值(B)。如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,否则处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值(在CAS的一些特殊情况下将仅返回CAS是否成功,而不提取当前值)。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可”。这其实和乐观锁的冲突检查+数据更新的原理是一样的。

JAVA对CAS的支持:
在JDK1.5中新增java.util.concurrent包就是建立在CAS之上的。相对于synchronized这种阻塞算法,CAS是非阻塞算法的一种常见实现。所以java.util.concurrent包中的AtomicInteger为例,看一下在不使用锁的情况下是如何保证线程安全的。主要理解getAndIncrement方法,该方法的作用相当于++i操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class AtomicInteger extends Number implements java.io.Serializable{
  private volatile int value;
  public final int get(){
    return value;
  }

  public final int getAndIncrement(){
    for (;;){
      int current = get();
      int next = current + 1;
      if (compareAndSet(current, next))
      return current;
    }
  }

  public final boolean compareAndSet(int expect, int update){
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  }
}

CompletableFuture

发表于 2019-10-22 | 更新于 2019-10-23 | 分类于 Java | 评论数: | 阅读次数:

简介

¶异步计算

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

以前我们获取一个异步任务的结果可能是这样写的:

¶Future 接口的局限性

Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将多个异步计算的结果合并成一个
  • 等待Future集合中的所有任务都完成
  • Future完成事件(即,任务完成以后触发执行动作)

¶CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

¶CompletableFuture

  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口

¶实例代码

¶基本的CompletableFuter使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class BaseComFuture {

public static void main(String[] args) throws InterruptedException, ExecutionException {


long start = System.nanoTime();
//
// System.out.println("get start");
// TimeUnit.SECONDS.sleep(3);
// TimeUnit.SECONDS.sleep(3);
// TimeUnit.SECONDS.sleep(3);
// System.out.println(456);
//
// System.out.println(System.nanoTime()-start);

/*
* 新建一个CompletableFuture对象
*/
start = System.nanoTime();
CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(()->{
try {
System.out.println("get start1");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
});
// CompletableFuture<String> resultCompletableFuture1 = CompletableFuture.supplyAsync(()->{
// try {
// System.out.println("get start2");
// TimeUnit.SECONDS.sleep(3);
// System.out.println(Thread.currentThread().getName());
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// return "Hello CompletableFuture";
// });
// CompletableFuture<String> resultCompletableFuture2 = CompletableFuture.supplyAsync(()->{
// try {
// System.out.println("get start3");
// TimeUnit.SECONDS.sleep(5);
// System.out.println(Thread.currentThread().getName());
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// return "Hello CompletableFuture";
// });
TimeUnit.SECONDS.sleep(1);
System.out.println(123);
System.out.println(resultCompletableFuture.get());
System.out.println("aaaaaaa");
// System.out.println(resultCompletableFuture1.get());
System.out.println("aaaaaaa");
// System.out.println(resultCompletableFuture2.get());
System.out.println("aaaaaaa");
System.out.println(456);
System.out.println(System.nanoTime()-start);
/***
* ps : 首先会进入该进程,执行get方法 //所以输出结果中立马会输出get start 然后,sleep模拟长时间计算操作/或者其他情况的阻塞
* //所以这个时候屏幕会等三秒,一直停在那里 3秒后才会打印 // 三秒后,才会开始打印相关信息,按照顺序执行
* (不要被try语句迷惑了啊,那就是一个一瞬执行下来的逻辑) 打印完后该进程退出!
*
* 如果我用resultCompletableFuture的回调函数去处理这个会有什么现象呢? 请看BaseComFutureCallback类中的演示
*/

/**
* 上面那个打印语句会阻塞3秒,执行完后 ,才会执行这一句
*/

}

}

¶回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class BaseComFutureCallback {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
/*
* 新建一个CompletableFuture对象
*/
CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(()->{
try {
System.out.println("get start,will sleep 3s");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
}, executor);

CompletableFuture<String> resultCompletableFuture2 = CompletableFuture.supplyAsync(()->{
try {
System.out.println("get start,will sleep 3s");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
}, executor);

/**
* ps: 回调函数,顾名思义 那就是调用的对象中声明的方法正确执行完后,就会调用这个方法。
* 其中,accept中的参数就是get函数返回的参数
*/
System.out.println(resultCompletableFuture.thenAccept((t)->{
System.out.println("进入回调函数-" + t);
System.out.println(Thread.currentThread().getName());
}));
System.out.println(resultCompletableFuture2.thenAccept((t)->{
System.out.println("进入回调函数-" + t);
System.out.println(Thread.currentThread().getName());
}));

System.out.println("带有回调的print语句后面一句话");
System.out.println("");

/**
* it will shutdown 10's later
*/
System.out.println("it will shutdown 10's later");
TimeUnit.SECONDS.sleep(10);

executor.shutdown();
System.out.println("shutdown");
}
}
/**
* ps: 两点
* 1.这个时候你会看到 ,在带有回调的print语句后的一个print会先打印(因为那个回调在等着get函数里面sleep3秒钟),
* 但是,不会阻塞着等,而是会执行下面的语句,然后,上面的语句执行完后,再来执行这个带有回调的print
* 2.当打印线程的时候,我们可以看到,回调与get是统一个线程!
* 因为我们用的thenAccept去做回调,
* 那么,当我们尝试用带Async的回调方法去回调试试看
* 详见BaseComFutureCallbackAsync
*
*
* 补充:
* 这里与上一个不同,因为这个地方加了个executor,为什么?
* 由于回调,resultCompletableFuture中的get不知道会阻塞到什么时候
* (虽然现在写死的是3秒,但是生产环境中 ,哪个能保证3秒?)
* 但是,这个地方他又不会像上一个程序那样一顺执行,非得等阻塞完成再执行下一句
* 而是马上执行下面的print,当main中所有语句顺序执行完了之后,这个main线程就关闭了,
* 所以,你的回调永远不会执行,因为主线程都down掉了。你咋回来。(可惜啊,main不会等,等了不还是阻塞)
*
* 所以,这里我先起一个executor,用来开一个线程池,如果不显式关闭,
* 他就会一直挂在那,既然一直挂在那里,我显然,会等到我回调的那天(get必然会返回值,只是时间长短啊)
*
* 所以,哪怕你main语句执行完了,但是我只要不关闭这个线程池,你的main就一直跟我挂在那,因为还有一句话没有执行完嘛.
*
* BaseComFutureCallback2中对这个的演示会更明显
*
*/

¶异常通知的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class BaseComFutureExceptionally2 {

public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
/*
* 新建一个CompletableFuture对象
*/
CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
System.out.println("get start,will sleep 3s");
TimeUnit.SECONDS.sleep(3);
throw new RuntimeException("错误");
// System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}

return "Hello CompletableFuture";
}
}, executor);

System.out.println(resultCompletableFuture.thenAccept(new Consumer<String>() {
@Override
public void accept(String t) {
System.out.println("进入回调函数-" + t);
throw new RuntimeException("aaaaa");
// System.out.println(Thread.currentThread().getName());
}

}).exceptionally(new Function<Throwable, Void>() {

/**
* ps
* 当出现异常的的时候,会执行这个function
* 这里就是对异常进行一些处理
*/
@Override
public Void apply(Throwable t) {
System.out.println(t.getMessage());
return null;
}
}));

/**
* ps: 我们返现,其余的流程都是一样的,就是回调函数不再执行了!
* 任务还是会执行完成(只要主线程等待足够时间再结束,或者不结束 )
* 但是,我们的回调函数,一般应用中肯定是希望执行的(既然是回调,我肯定希望在目标方法执行完后,进行一些处理工作),哪
* 怕是你报错,我也要指导你报错了,你完全不执行,显然是不合理的
*
* 但是我们实际应用中坑定不是通过
* resultCompletableFuture.completeExceptionally(new Exception("error"));
* 这个方法来抛出异常,肯定是在get中执行的时候出现异常然后抛出
* 这种情况详见BaseComFutureExceptionally2
*/
resultCompletableFuture.completeExceptionally(new RuntimeException("error"));

System.out.println("it will shutdown 10's later");
TimeUnit.SECONDS.sleep(10);

executor.shutdown();

System.out.println("time out ,main shutdown now");
}

}
/**
* 我们往往不仅需要通过回调的方式,让线程不阻塞
* 我们还需要将这些回调操作串起来
* 接着请看下面的转换
* 需要用到apply函数组
*
*
* public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
* public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
* public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
* 当原来的CompletableFuture计算完后,
* 将结果传递给函数fn,
* 将fn的结果作为新的CompletableFuture的参数去参与运算。
* 因此它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>
*
* 持续这么循环的调下去,有点像递归,当然,递归是本函数的持续调用(直到递归条件不满足)
*
*
*/

¶我们不仅仅回调,还可以将这些回调 操作串起来

需要用到apply函数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.concurrent.*;
import java.util.function.Function;

public class ConvertComFuture {

public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
// throw new RuntimeException("错误");
System.out.println("aaaaa");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "zero";
}, executor);

CompletableFuture<Integer> f2 = f1.thenApply(new Function<String, Integer>() {

@Override
public Integer apply(String t) {
System.out.println("进入f2的apply方法");
System.out.println("f1传进来的字符串-"+t);
System.out.println("返回该字符串的长度-"+Integer.valueOf(t.length()));

try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(t.length());
}
}).exceptionally(new Function<Throwable, Integer>() {

@Override
public Integer apply(Throwable t) {
System.out.println(t.getMessage());
return null;
}

});
System.out.println("bbbbb");

/**
* 此处,apply的是f1
*/

CompletableFuture<Double> f3 = f2.thenApply(self -> self * 2.0).thenApply(self -> self*3).exceptionally(new Function<Throwable, Double>() {
@Override
public Double apply(Throwable t) {
System.out.println(t.getMessage());
return null;
}
});//这个参数名字随便取,叫self最合适,因为他本来就是把f2自身的结果带到f3中去,参与f3的运算
/**
* 此处,apply的是f2
*
* 这个地方的self,不严格的说,就是一个的代名词,就是代表了f2的返回值
*
* 如果想要看的清楚一点,可以像f1一样显式声明一个function匿名对象,覆盖apply方法,然后写逻辑
* 其中,方括号前面的参数是传进来的参数类型,后面的参数类型是返回类型
*
* 不过,我们一般采用f2的方式,更简洁一点
*
* 这里需要注意的是,f3获得最终结果还真不会马上执行,也不会导致主进程阻塞
* 而是等着这里面所有的“回调”阶段一个接一个的完成后,再显示出来(或者说再进入f3的执行执行逻辑)。
*
* 但是,这个方法一旦有异常,就会抛出
*/
//System.out.println(f3.get());


System.out.println("shutdown in 3s");
TimeUnit.SECONDS.sleep(3);

System.out.println("shutdown");
executor.shutdown();
}

}

NIO、BIO、AIO网络通信

发表于 2019-09-10 | 更新于 2019-10-23 | 分类于 Java | 评论数: | 阅读次数:

一、概念

¶1、同步和异步

¶同步:

用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行

¶异步:

用户线程发起I/O请求后仍需要继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数

¶2、阻塞和非阻塞

讨论的是参与通信双方的工作机制,是否需要互相等待对方的执行

¶阻塞:

在通信过程中,
一方在处理通信,
另一方要等待对方执行并返回信息不能去做其他无关的事

¶非阻塞:

在通信过程中,
一方在处理通信,
另一方可以不用等待执行并返回信息而可以去做其他无关的事 直到对方处理通信完成 再在适合的时候继续处理通信过程

二、BIO (同步阻塞)

¶代码示例

服务端业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
* 客户端消息处理线程ServerHandler
*
* @author 蒋时华
* @date 2017/6/24
*/
public class ServerHandler implements Runnable{
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try{
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
String expression;
String result;
while(true){
//通过BufferedReader读取一行
//如果已经读到输入流尾部,返回null,退出循环
//如果得到非空值,就尝试计算结果并返回
if((expression = in.readLine())==null) {
break;
}
System.out.println("server阻塞测试");
System.out.println("服务器收到消息:" + expression);
try{
result = Calculator.cal(expression).toString();
}catch(Exception e){
result = "计算错误:" + e.getMessage();
}
out.println(result);
}
}catch(Exception e){
e.printStackTrace();
}finally{
//一些必要的清理工作
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServerNormal {

//默认的端口号
private static int DEFAULT_PORT = 12344;
//单例的ServerSocket
private static ServerSocket server;

//线程池 懒汉式的单例
private static ExecutorService executorService = Executors.newFixedThreadPool(1);

//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public static void start() throws IOException {
//使用默认值
start(DEFAULT_PORT);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public synchronized static void start(int port) throws IOException{
if(server != null) return;
try{
//通过构造函数创建ServerSocket
//如果端口合法且空闲,服务端就监听成功
server = new ServerSocket(port);
System.out.println("服务器已启动,端口号:" + port);
//通过无线循环监听客户端连接
while(true){
//如果没有客户端接入,将阻塞在accept操作上。
Socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条Socket链路
// TODO: 2019/8/28 问题所在
// new Thread(new ServerHandler(socket)).start();
executorService.execute(new ServerHandler(socket));
}
}finally{
//一些必要的清理工作
if(server != null){
System.out.println("服务器已关闭。");
server.close();
server = null;
}
}
}


}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
* 同步阻塞式I/O创建的Client源码
*
* @author 蒋时华
* @date 2017/6/24
*/
public class Client {

//默认的端口号
private static int DEFAULT_SERVER_PORT = 12344;
private static String DEFAULT_SERVER_IP = "127.0.0.1";
public static void send(String expression){
send(DEFAULT_SERVER_PORT,expression);
}
public static void send(int port,String expression){
System.out.println("算术表达式为:" + expression);
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try{
socket = new Socket(DEFAULT_SERVER_IP,port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
out.println(expression);
System.out.println("___结果为:" + in.readLine());
System.out.println("client阻塞测试");
}catch(Exception e){
e.printStackTrace();
}finally{
//一下必要的清理工作
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.IOException;
import java.util.Random;
/**
* Test class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class BIOTest {

//测试主方法
public static void main(String[] args) throws InterruptedException {
//运行服务器
new Thread(() -> {
try {
ServerNormal.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();

//避免客户端先于服务器启动前执行代码
Thread.sleep(100);
//运行客户端
char operators[] = {'+','-','*','/'};
Random random = new Random(System.currentTimeMillis());
new Thread(() -> {
while(true){
//随机产生算术表达式
String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
Client.send(expression);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

¶问题

同步阻塞式I/O创建的Server

¶结构图

¶1、BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能,甚至罢工)

¶1、限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对Socket的输入流进行读取时,会一直阻塞

所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。而NIO,就能解决这个难题

三、NIO

¶代码示例

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
private static int DEFAULT_PORT = 12345;
private static ServerHandle serverHandle;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null) {
serverHandle.stop();
}
serverHandle = new ServerHandle(port);
new Thread(serverHandle,"Server").start();
}
}

class ServerHandle implements Runnable{
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean started;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public ServerHandle(int port) {
try{
//创建选择器
selector = Selector.open();
//打开监听通道
serverChannel = ServerSocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverChannel.configureBlocking(false);//开启非阻塞模式
//绑定端口 backlog设为1024
serverChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端连接请求
// 如果你对不止一种事件感兴趣,使用或运算符即可,如下:
// int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记服务器已开启
started = true;
System.out.println("服务器已启动,端口号:" + port);
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
//循环遍历selector
while(started){
try{
// 阻塞,阻塞到至少有一个通道在你注册的事件上就绪了。
// 这个过程可能会造成调用线程进入阻塞状态, 通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回
// wakeup()该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
// selector.select();
// 和select()一样,但最长阻塞时间为timeout毫秒。
// selector.select(1000);
// 非阻塞,只要有通道就绪就立刻返回。
selector.selectNow();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Throwable t){
t.printStackTrace();
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null) {
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//处理新接入的请求消息
if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//通过ServerSocketChannel的accept创建SocketChannel实例
//完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
//设置为非阻塞的
// 神奇的事情,blocking默认是true
// TODO: 2019/8/28
sc.configureBlocking(false);
//注册为读
sc.register(selector, SelectionKey.OP_READ);
}
//读消息
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String expression = new String(bytes,"UTF-8");
System.out.println("服务器收到消息:" + expression);
//处理数据
String result = null;
try{
result = Calculator.cal(expression).toString();
}catch(Exception e){
result = "计算错误:" + e.getMessage();
}
//发送应答消息
doWrite(sc,result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送应答消息
private void doWrite(SocketChannel channel,String response) throws IOException {
//将消息编码为字节数组
byte[] bytes = response.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}
}

final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Client {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static ClientHandle clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
clientHandle.stop();
clientHandle = new ClientHandle(ip,port);
new Thread(clientHandle,"Server").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
}

class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;

private SocketChannel socketChannel;

private volatile boolean started;

public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);//开启非阻塞模式
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect());
else System.exit(1);
}
//读消息
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private void doWrite(SocketChannel channel,String request) throws IOException {
//将消息编码为字节数组
byte[] bytes = request.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
// TODO: 2019/8/28 因为应答消息的发送,
// SocketChannel也是异步非阻塞的,
// 所以不能保证一次能吧需要发送的数据发送完,
// 此时就会出现写半包的问题。我们需要注册写操作,
// 不断轮询Selector将没有发送完的消息发送完毕,
// 然后通过Buffer的hasRemain()方法判断消息是否发送完成。
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port)));
else socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void sendMsg(String msg) throws Exception{
socketChannel.register(selector, SelectionKey.OP_WRITE);
doWrite(socketChannel, msg);

}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Random;

/**
* Test class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class NIOTest {

public static void main(String[] args) throws Exception{
//运行服务器
Server.start();
//避免客户端先于服务器启动前执行代码
Thread.sleep(1000);
//运行客户端
Client.start();
Thread.sleep(3000);

Random random = new Random(System.currentTimeMillis());
char operators[] = {'+','-','*','/'};
// String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
// while(Client.sendMsg(new Scanner(System.in).nextLine()));
while(
Client.sendMsg(random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1))){
Thread.sleep(random.nextInt(1000));
};
}

}

¶nio 结构

¶服务端
  • 打开ServerSocketChannel,监听客户端连接
  • 绑定监听端口,设置连接为非阻塞模式
  • 创建Reactor线程,创建多路复用器并启动线程
  • 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
  • Selector轮询准备就绪的key
  • Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,建立链路
¶客户端
  • 设置客户端链路为非阻塞模式
  • 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
  • 异步读取客户端消息到缓冲区
  • 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
  • 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
¶Selector(多路复用器|选择某个通道器)

选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。
通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态,
当这么做的时候,可以选择将被激发的线程挂起直到有就绪的通道。
使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

¶SelectionKey

表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。
key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。
key.channel(); // 返回该SelectionKey对应的channel。
key.selector(); // 返回该SelectionKey对应的Selector。
key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask
key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。

事件名对应值
服务端接收客户端连接事件SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件SelectionKey.OP_CONNECT(8)
读事件SelectionKey.OP_READ(1)
写事件SelectionKey.OP_WRITE(4)

¶buffer(解决bio中数据不可重复读的问题)

存储基本类型数组数据:ByteBuffer、CharBuffer、FloatBuffer、ShortBuffer、StringCharBuffer等等
这些方法中大部分是对mark、position、limit、capacity的操作。
对于数组来说,需要以下一些重要元素,比如数组大小(capacity)
此时如果是对数组的读取操作时,需要表明当前读到了哪个位置(position),总共可以读到哪个位置(limit),也就是当前数组中有几个元素。
此时如果是写操作,那么需要知道现在写到了哪个位置(position),最大可以写到哪个位置(limit)
最后为了实现可重复读,产生一个备忘位置,即标记(mark)。
源码(只截取部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
public abstract class Buffer {

// ...
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
// ...

// Creates a new buffer with the given mark, position, limit, and capacity,
// after checking invariants.
//
Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark;
}
}

/**
* Returns this buffer's capacity.
*
* @return The capacity of this buffer
*/
public final int capacity() {
return capacity;
}

/**
* Returns this buffer's position.
*
* @return The position of this buffer
*/
public final int position() {
return position;
}

/**
* Sets this buffer's position. If the mark is defined and larger than the
* new position then it is discarded.
*
* @param newPosition
* The new position value; must be non-negative
* and no larger than the current limit
*
* @return This buffer
*
* @throws IllegalArgumentException
* If the preconditions on <tt>newPosition</tt> do not hold
*/
public final Buffer position(int newPosition) {
if ((newPosition > limit) || (newPosition < 0))
throw new IllegalArgumentException();
position = newPosition;
if (mark > position) mark = -1;
return this;
}

/**
* Returns this buffer's limit.
*
* @return The limit of this buffer
*/
public final int limit() {
return limit;
}

/**
* Sets this buffer's limit. If the position is larger than the new limit
* then it is set to the new limit. If the mark is defined and larger than
* the new limit then it is discarded.
*
* @param newLimit
* The new limit value; must be non-negative
* and no larger than this buffer's capacity
*
* @return This buffer
*
* @throws IllegalArgumentException
* If the preconditions on <tt>newLimit</tt> do not hold
*/
public final Buffer limit(int newLimit) {
if ((newLimit > capacity) || (newLimit < 0))
throw new IllegalArgumentException();
limit = newLimit;
if (position > limit) position = limit;
if (mark > limit) mark = -1;
return this;
}

/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark() {
mark = position;
return this;
}

/**
* Resets this buffer's position to the previously-marked position.
*
* <p> Invoking this method neither changes nor discards the mark's
* value. </p>
*
* @return This buffer
*
* @throws InvalidMarkException
* If the mark has not been set
*/
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}

/**
* Clears this buffer. The position is set to zero, the limit is set to
* the capacity, and the mark is discarded.
*
* <p> Invoke this method before using a sequence of channel-read or
* <i>put</i> operations to fill this buffer. For example:
*
* <blockquote><pre>
* buf.clear(); // Prepare buffer for reading
* in.read(buf); // Read data</pre></blockquote>
*
* <p> This method does not actually erase the data in the buffer, but it
* is named as if it did because it will most often be used in situations
* in which that might as well be the case. </p>
*
* @return This buffer
*/
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

/**
* Flips this buffer. The limit is set to the current position and then
* the position is set to zero. If the mark is defined then it is
* discarded.
*
* <p> After a sequence of channel-read or <i>put</i> operations, invoke
* this method to prepare for a sequence of channel-write or relative
* <i>get</i> operations. For example:
*
* <blockquote><pre>
* buf.put(magic); // Prepend header
* in.read(buf); // Read data into rest of buffer
* buf.flip(); // Flip buffer
* out.write(buf); // Write header + data to channel</pre></blockquote>
*
* <p> This method is often used in conjunction with the {@link
* java.nio.ByteBuffer#compact compact} method when transferring data from
* one place to another. </p>
*
* @return This buffer
*/
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

/**
* Rewinds this buffer. The position is set to zero and the mark is
* discarded.
*
* <p> Invoke this method before a sequence of channel-write or <i>get</i>
* operations, assuming that the limit has already been set
* appropriately. For example:
*
* <blockquote><pre>
* out.write(buf); // Write remaining data
* buf.rewind(); // Rewind buffer
* buf.get(array); // Copy data into array</pre></blockquote>
*
* @return This buffer
*/
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

// ...
}

直接缓冲区与非直接缓冲区(ByteBuffer):

  • 非直接缓冲区:
    优点:在虚拟机内存中创建,易回收
    缺点:但占用虚拟机内存开销,处理中有复制过程。
  • 直接缓冲区:
    优点:在虚拟机内存外,开辟的内存,IO操作直接进行,没有再次复制
    缺点:创建和销毁开销大,没有管理权(基于系统的物理内存没有分代回收机制)


    用通俗的话讲就是,比如你是个小组长(jvm堆内存),你管理者你底下的人,
    但是你的领导(内核[物理空间])要知道你的情况,你需要把你的组内的情况汇报给他(复制),而他自己本身只知道你的情况,你下面人的情况他是不了解的也是不关心的,相当于他把这块区域分配给你,至于你要干什么,他是不管的

JVM创建一个缓冲区的时候,实际上做了如下几件事:

  • JVM确保Heap区域内的空间足够,如果不够则使用触发GC在内的方法获得空间;
  • 获得空间之后会找一组堆内的连续地址分配数组, 这里需要注意的是,在物理内存上,这些字节是不一定连续的;
  • 对于不涉及到IO的操作,这样的处理没有任何问题,但是当进行IO操作的时候就会出现一点性能问题.
    所有的IO操作都需要操作系统进入内核态才行,而JVM进程属于用户态进程, 当JVM需要把一个缓冲区写到某个Channel或Socket的时候,需要切换到内核态.
    而内核态由于并不知道JVM里面这个缓冲区存储在物理内存的什么地址,并且这些物理地址并不一定是连续的(或者说不一定是IO操作需要的块结构),
    所以在切换之前JVM需要把缓冲区复制到物理内存一块连续的内存上, 然后由内核去读取这块物理内存,整合成连续的、分块的内存.

三、AIO

¶代码示例:

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
* Server class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class Server {

private static int DEFAULT_PORT = 12345;
private static AsyncServerHandler serverHandle;
public volatile static long clientCount = 0;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null)
return;
serverHandle = new AsyncServerHandler(port);
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args){
Server.start();
}


}

class AsyncServerHandler implements Runnable {
public CountDownLatch latch;
public AsynchronousServerSocketChannel channel;
public AsyncServerHandler(int port) {
try {
//创建服务端通道
channel = AsynchronousServerSocketChannel.open();
//绑定端口
channel.bind(new InetSocketAddress(port));
System.out.println("服务器已启动,端口号:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//CountDownLatch初始化
//它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
//此处,让现场在此阻塞,防止服务端执行完成后退出
//也可以使用while(true)+sleep
//生成环境就不需要担心这个问题,以为服务端是不会退出的
latch = new CountDownLatch(1);
//用于接收客户端的连接
channel.accept(this,new AcceptHandler());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
//继续接受其他客户端的请求
Server.clientCount++;
System.out.println("连接的客户端数:" + Server.clientCount);
serverHandler.channel.accept(serverHandler, this);
//创建新的Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
channel.read(buffer, buffer, new ReadHandler(channel));
}
@Override
public void failed(Throwable exc, AsyncServerHandler serverHandler) {
exc.printStackTrace();
serverHandler.latch.countDown();
}
}

class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
//用于读取半包消息和发送应答
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
//读取到消息后的处理
@Override
public void completed(Integer result, ByteBuffer attachment) {
//flip操作
attachment.flip();
//根据
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
try {
String expression = new String(message, "UTF-8");
System.out.println("服务器收到消息: " + expression);
String calrResult = null;
try{
calrResult = Calculator.cal(expression).toString();
}catch(Exception e){
calrResult = "计算错误:" + e.getMessage();
}
//向客户端发送消息
doWrite(calrResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//发送消息
private void doWrite(String result) {
byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//异步写数据 参数与前面的read一样
channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果没有发送完,就继续发送直到完成
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
else{
//创建新的Buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
channel.read(readBuffer, readBuffer, new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;

/**
* Client class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class Client {

private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static AsyncClientHandler clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
return;
clientHandle = new AsyncClientHandler(ip,port);
new Thread(clientHandle,"Client").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception{
Client.start();
System.out.println("请输入请求消息:");
Scanner scanner = new Scanner(System.in);
while(Client.sendMsg(scanner.nextLine()));
}

}

class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public AsyncClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//创建异步的客户端通道
clientChannel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//创建CountDownLatch等待
latch = new CountDownLatch(1);
//发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
clientChannel.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//连接服务器成功
//意味着TCP三次握手完成
@Override
public void completed(Void result, AsyncClientHandler attachment) {
System.out.println("客户端成功连接到服务器...");
}
//连接服务器失败
@Override
public void failed(Throwable exc, AsyncClientHandler attachment) {
System.err.println("连接服务器失败...");
exc.printStackTrace();
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
//向服务器发送消息
public void sendMsg(String msg){
System.out.println("算术表达式为:" + msg);
byte[] req = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
//异步写
clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
}
}

class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
//完成全部数据的写入
if (buffer.hasRemaining()) {
clientChannel.write(buffer, buffer, this);
}
else {
//读取数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(clientChannel, latch));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("数据发送失败...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
}
}
}

class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println("客户端收到结果:"+ body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
System.err.println("数据读取失败...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
}
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Random;
import java.util.Scanner;

/**
* Test class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class AIOTest {

public static void main(String[] args) throws Exception{
//运行服务器
Server.start();
//避免客户端先于服务器启动前执行代码
Thread.sleep(1000);

//运行客户端
Client.start();
Thread.sleep(3000);

Random random = new Random(System.currentTimeMillis());
char operators[] = {'+','-','*','/'};

while(Client.sendMsg(random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1))){
Thread.sleep(random.nextInt(1000));
}
}

}

四、总结

¶同步和异步

我认为的网络层面io的同步和异步描述的是一种消息通知的机制,主动等待消息返回还是被动接受消息
同步io:指的是调用方通过主动等待获取调用返回的结果来获取消息通知。
异步io:指的是被调用方通过某种方式(如,回调函数)来通知调用方获取消息。

¶阻塞和非阻塞

NIO、AIO为什么被称为非阻塞?

  • BIO在发起读请求以后,会一直等待,一直到拿到结果
  • NIO在发起读请求以后,不会立即拿到结果
  • AIO通过回调方法,被动的接受

¶BIO(blocking IO):

同步阻塞式IO 面向流 操作字节或字符 单向传输数据

¶NIO(non blocking IO):

同步非阻塞式IO 面向通道 操作缓冲区 双向传输数据

¶AIO(async IO):

同步非阻塞式IO 大量使用回调函数 异步处理通信过程 异步的双向传输数据

java 代理

发表于 2019-08-20 | 更新于 2019-10-22 | 分类于 Java | 评论数: | 阅读次数:

1、JDK静态代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.jphoebe.edu.demo.proxy.staticProxy;

/**
* 静态代理
*/
public class UserProxy implements StaticPerson {

public static void main(String[] args) {
// 静态代理
StaticUser targetObj = new StaticUser();
new UserProxy(targetObj).exection();

}

public StaticUser user;

public UserProxy(StaticUser user) {
this.user = user;
}

@Override
public void exection() {
System.out.println("执行真正的代码前。。。。。。");
user.exection();
System.out.println("执行真正的代码后。。。。。。");
}

}
interface StaticPerson {
void exection();
}
class StaticUser implements StaticPerson {
@Override
public void exection() {
System.out.println("实际执行的代码");
}
}

2、JDK动态代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.jphoebe.edu.demo.proxy.dynamic;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
* 动态代理
*/
public class ProxyHandle implements InvocationHandler {
public static void main(String[] args) {
// 参数:-Dsun.misc.ProxyGenerator.saveGeneratedFiles 可以让jdk生成代理后的class文件, 源码:ProxyGenerator.saveGeneratedFiles
DynamicUser targetObj = new DynamicUser();
ProxyHandle proxyHandle = new ProxyHandle(new DynamicUser());
DynamicPerson proxyObject = (DynamicPerson) Proxy.newProxyInstance(
DynamicUser.class.getClassLoader(),
DynamicUser.class.getInterfaces(),
proxyHandle);
proxyObject.exection();

}
private Object proxyObj;
public ProxyHandle(Object targetObj) {
this.proxyObj = targetObj;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
System.out.println("DynamicProxy类对" + methodName + "进行了增强处理,begin....");
// 执行原有方法
method.invoke(proxyObj, args);
System.out.println("DynamicProxy类对" + methodName + "进行了增强处理,end....");
return null;
}
}
interface DynamicPerson {
void exection();
}
class DynamicUser implements DynamicPerson {
@Override
public void exection() {
System.out.println("实际执行的代码");
}
}

这样就可以解释Mybatis为什么定义一个接口就可以执行sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.jphoebe.edu.demo.proxy.dynamic;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
* ProxyFactory class
*
*/
public class MybatisProxy implements InvocationHandler {

public static void main(String[] args) {
// mybatis mapper代理的实现
MybatisProxy mybatisProxy = new MybatisProxy(PersonMapper.class);
PersonMapper proxyObject = (PersonMapper) Proxy.newProxyInstance(
PersonMapper.class.getClassLoader(),
new Class[] { PersonMapper.class },
mybatisProxy);
proxyObject.exection();
}

private Object proxyObj;

public MybatisProxy(Object targetObj) {
this.proxyObj = targetObj;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
System.out.println("执行与该接口对应的mapping xml文件中的sql");
// 执行sql的操作
return null;
}
}

interface PersonMapper {
void exection();
}

动态代理生成的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package org.jphoebe.edu.demo.proxy.dynamic;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;

final class $Proxy0 extends Proxy implements PersonMapper {
private static Method m1;
private static Method m2;
private static Method m3;
private static Method m0;

public $Proxy0(InvocationHandler var1) throws {
super(var1);
}

public final boolean equals(Object var1) throws {
try {
return (Boolean)super.h.invoke(this, m1, new Object[]{var1});
} catch (RuntimeException | Error var3) {
throw var3;
} catch (Throwable var4) {
throw new UndeclaredThrowableException(var4);
}
}

public final String toString() throws {
try {
return (String)super.h.invoke(this, m2, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

public final void exection() throws {
try {
super.h.invoke(this, m3, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

public final int hashCode() throws {
try {
return (Integer)super.h.invoke(this, m0, (Object[])null);
} catch (RuntimeException | Error var2) {
throw var2;
} catch (Throwable var3) {
throw new UndeclaredThrowableException(var3);
}
}

static {
try {
m1 = Class.forName("java.lang.Object").getMethod("equals", Class.forName("java.lang.Object"));
m2 = Class.forName("java.lang.Object").getMethod("toString");
m3 = Class.forName("org.jphoebe.edu.demo.proxy.dynamic.PersonMapper").getMethod("exection");
m0 = Class.forName("java.lang.Object").getMethod("hashCode");
} catch (NoSuchMethodException var2) {
throw new NoSuchMethodError(var2.getMessage());
} catch (ClassNotFoundException var3) {
throw new NoClassDefFoundError(var3.getMessage());
}
}
}

可以看出,生成的类继承了被代理对象的接口,所以JDK原生的动态代理是基于接口的,如果对象没有使用接口,那么就无法使用jdk动态代理,需要使用cglib动态代理

2、cglib动态代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package org.jphoebe.edu.demo.proxy.cglib;


import net.sf.cglib.core.DebuggingClassWriter;
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;

import java.lang.reflect.Method;

/**
* CglibProxy class
*
* @author 蒋时华
* @date 2019/8/29
*/
public class CglibProxy implements MethodInterceptor {

public static void main(String[] args) {
// cglib 代理
// 代理类class文件存入本地磁盘方便我们反编译查看源码
System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY, "E:\\prod\\edu-demo");
// 通过CGLIB动态代理获取代理对象的过程
Enhancer enhancer = new Enhancer();
// 设置enhancer对象的父类
enhancer.setSuperclass(CglibUser.class);
// 设置enhancer的回调对象
enhancer.setCallback(new CglibProxy());
// 创建代理对象
CglibUser proxy= (CglibUser)enhancer.create();
// 通过代理对象调用目标方法
proxy.exection();


// 源码
// MethodInterceptor
// Enhancer.createHelper()
// KeyFactory.generateClass()
// AbstractClassGenerator.create()
// BeanGenerator.nextInstance()
}

/**
* sub:cglib生成的代理对象
* method:被代理对象方法
* objects:方法入参
* methodProxy: 代理方法
*/
/**
* FastClass f1; // net.sf.cglib.test.Target的fastclass
* FastClass f2; // Target$$EnhancerByCGLIB$$788444a0 的fastclass
* int i1; //方法g在f1中的索引
* int i2; //方法CGLIB$g$0在f2中的索引
*
**/
@Override
public Object intercept(Object sub, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
System.out.println("======插入前置通知======");
Object object = methodProxy.invokeSuper(sub, objects);
System.out.println("======插入后者通知======");
return object;
}
}

class CglibUser{
public void exection() {
System.out.println("实际执行的代码");
}
}

使用cglib代理,会生成

1
2
3
CglibUser$$EnhancerByCGLIB$$45809d07$$FastClassByCGLIB$$db8bd788.class
CglibUser$$EnhancerByCGLIB$$45809d07.class
CglibUser$$FastClassByCGLIB$$575d67e9.class

三个文件
EnhancerByCGLIB:是生成的代理对象
FastClassByCGLIB:是原对象方法的索引地址(避免使用了反射,而是直接调用对象方法)
EnhancerByCGLIB$$45809d07$$FastClassByCGLIB:是生成代理对方法的索引地址
fastclass中的部分代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public int getIndex(Signature var1) {
String var10000 = var1.toString();
switch(var10000.hashCode()) {
case -1651887568:
if (var10000.equals("exection()V")) {
return 0;
}
break;
case 1826985398:
if (var10000.equals("equals(Ljava/lang/Object;)Z")) {
return 1;
}
break;
case 1913648695:
if (var10000.equals("toString()Ljava/lang/String;")) {
return 2;
}
break;
case 1984935277:
if (var10000.equals("hashCode()I")) {
return 3;
}
}

return -1;
}

生成class源码:

1
2
3
4
5
// MethodInterceptor
// Enhancer.createHelper()
// KeyFactory.generateClass()
// AbstractClassGenerator.create()
// BeanGenerator.nextInstance()

生成对象核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public void generateClass(ClassVisitor v) {
ClassEmitter ce = new ClassEmitter(v);

Method newInstance = ReflectUtils.findNewInstance(keyInterface);
if (!newInstance.getReturnType().equals(Object.class)) {
throw new IllegalArgumentException("newInstance method must return Object");
}

Type[] parameterTypes = TypeUtils.getTypes(newInstance.getParameterTypes());
ce.begin_class(Constants.V1_2,
Constants.ACC_PUBLIC,
getClassName(),
KEY_FACTORY,
new Type[]{Type.getType(keyInterface)},
Constants.SOURCE_FILE);
EmitUtils.null_constructor(ce);
EmitUtils.factory_method(ce, ReflectUtils.getSignature(newInstance));

int seed = 0;
CodeEmitter e = ce.begin_method(Constants.ACC_PUBLIC,
TypeUtils.parseConstructor(parameterTypes),
null);
e.load_this();
e.super_invoke_constructor();
e.load_this();
List<FieldTypeCustomizer> fieldTypeCustomizers = getCustomizers(FieldTypeCustomizer.class);
for (int i = 0; i < parameterTypes.length; i++) {
Type parameterType = parameterTypes[i];
Type fieldType = parameterType;
for (FieldTypeCustomizer customizer : fieldTypeCustomizers) {
fieldType = customizer.getOutType(i, fieldType);
}
seed += fieldType.hashCode();
ce.declare_field(Constants.ACC_PRIVATE | Constants.ACC_FINAL,
getFieldName(i),
fieldType,
null);
e.dup();
e.load_arg(i);
for (FieldTypeCustomizer customizer : fieldTypeCustomizers) {
customizer.customize(e, i, parameterType);
}
e.putfield(getFieldName(i));
}
e.return_value();
e.end_method();

// hash code
e = ce.begin_method(Constants.ACC_PUBLIC, HASH_CODE, null);
int hc = (constant != 0) ? constant : PRIMES[(Math.abs(seed) % PRIMES.length)];
int hm = (multiplier != 0) ? multiplier : PRIMES[(Math.abs(seed * 13) % PRIMES.length)];
e.push(hc);
for (int i = 0; i < parameterTypes.length; i++) {
e.load_this();
e.getfield(getFieldName(i));
EmitUtils.hash_code(e, parameterTypes[i], hm, customizers);
}
e.return_value();
e.end_method();

// equals
e = ce.begin_method(Constants.ACC_PUBLIC, EQUALS, null);
Label fail = e.make_label();
e.load_arg(0);
e.instance_of_this();
e.if_jump(CodeEmitter.EQ, fail);
for (int i = 0; i < parameterTypes.length; i++) {
e.load_this();
e.getfield(getFieldName(i));
e.load_arg(0);
e.checkcast_this();
e.getfield(getFieldName(i));
EmitUtils.not_equals(e, parameterTypes[i], fail, customizers);
}
e.push(1);
e.return_value();
e.mark(fail);
e.push(0);
e.return_value();
e.end_method();

// toString
e = ce.begin_method(Constants.ACC_PUBLIC, TO_STRING, null);
e.new_instance(Constants.TYPE_STRING_BUFFER);
e.dup();
e.invoke_constructor(Constants.TYPE_STRING_BUFFER);
for (int i = 0; i < parameterTypes.length; i++) {
if (i > 0) {
e.push(", ");
e.invoke_virtual(Constants.TYPE_STRING_BUFFER, APPEND_STRING);
}
e.load_this();
e.getfield(getFieldName(i));
EmitUtils.append_string(e, parameterTypes[i], EmitUtils.DEFAULT_DELIMITERS, customizers);
}
e.invoke_virtual(Constants.TYPE_STRING_BUFFER, TO_STRING);
e.return_value();
e.end_method();

ce.end_class();
}

再来看看正常java代码的字节码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// class version 52.0 (52)
// access flags 0x21
public class org/jphoebe/edu/demo/proxy/staticProxy/UserProxy implements org/jphoebe/edu/demo/proxy/staticProxy/StaticPerson {

// compiled from: UserProxy.java

// access flags 0x1
public Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser; user

// access flags 0x9
public static main([Ljava/lang/String;)V
// parameter args
L0
LINENUMBER 10 L0
NEW org/jphoebe/edu/demo/proxy/staticProxy/StaticUser
DUP
INVOKESPECIAL org/jphoebe/edu/demo/proxy/staticProxy/StaticUser.<init> ()V
ASTORE 1
L1
LINENUMBER 11 L1
NEW org/jphoebe/edu/demo/proxy/staticProxy/UserProxy
DUP
ALOAD 1
INVOKESPECIAL org/jphoebe/edu/demo/proxy/staticProxy/UserProxy.<init> (Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser;)V
INVOKEVIRTUAL org/jphoebe/edu/demo/proxy/staticProxy/UserProxy.exection ()V
L2
LINENUMBER 13 L2
RETURN
L3
LOCALVARIABLE args [Ljava/lang/String; L0 L3 0
LOCALVARIABLE targetObj Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser; L1 L3 1
MAXSTACK = 3
MAXLOCALS = 2

// access flags 0x1
public <init>(Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser;)V
// parameter user
L0
LINENUMBER 17 L0
ALOAD 0
INVOKESPECIAL java/lang/Object.<init> ()V
L1
LINENUMBER 18 L1
ALOAD 0
ALOAD 1
PUTFIELD org/jphoebe/edu/demo/proxy/staticProxy/UserProxy.user : Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser;
L2
LINENUMBER 19 L2
RETURN
L3
LOCALVARIABLE this Lorg/jphoebe/edu/demo/proxy/staticProxy/UserProxy; L0 L3 0
LOCALVARIABLE user Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser; L0 L3 1
MAXSTACK = 2
MAXLOCALS = 2

// access flags 0x1
public exection()V
L0
LINENUMBER 23 L0
GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
LDC "\u6267\u884c\u771f\u6b63\u7684\u4ee3\u7801\u524d\u3002\u3002\u3002\u3002\u3002\u3002"
INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/String;)V
L1
LINENUMBER 25 L1
ALOAD 0
GETFIELD org/jphoebe/edu/demo/proxy/staticProxy/UserProxy.user : Lorg/jphoebe/edu/demo/proxy/staticProxy/StaticUser;
INVOKEVIRTUAL org/jphoebe/edu/demo/proxy/staticProxy/StaticUser.exection ()V
L2
LINENUMBER 27 L2
GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
LDC "\u6267\u884c\u771f\u6b63\u7684\u4ee3\u7801\u540e\u3002\u3002\u3002\u3002\u3002\u3002"
INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/String;)V
L3
LINENUMBER 28 L3
RETURN
L4
LOCALVARIABLE this Lorg/jphoebe/edu/demo/proxy/staticProxy/UserProxy; L0 L4 0
MAXSTACK = 2
MAXLOCALS = 1
}

会发现有诸多相似的命令,如:

1
2
3
dup (dup())
aload (load_this())
return (return_value())

等等
所以CGlib实际上是直接写字节码来创建代理类,所以也就无所无是否继承了接口,代理类只需要直接代理这个类就行。

总结

¶jdk动态代理

¶特点

  • Interface:对于JDK Proxy,业务类是需要一个Interface的,这是一个缺陷;

  • Proxy:Proxy类是动态产生的,这个类在调用Proxy.newProxyInstance()方法之后,产生一个Proxy类的实力。实际上,这个Proxy类也是存在的,不仅仅是类的实例,这个Proxy类可以保存在硬盘上;

  • Method:对于业务委托类的每个方法,现在Proxy类里面都不用静态显示出来。

  • InvocationHandler:这个类在业务委托类执行时,会先调用invoke方法。invoke方法在执行想要的代理操作,可以实现对业务方法的再包装。

  • JDK动态代理类实现了InvocationHandler接口,重写的invoke方法。
    JDK动态代理的基础是反射机制(method.invoke(对象,参数))Proxy.newProxyInstance()

¶cglib动态代理

¶特点

  • CGLib采用底层的字节码技术,全称是:Code Generation Library,CGLib可以为一个类创建一个子类,在子类中采用方法拦截的技术拦截所有父类方法的调用并顺势织入横切逻辑。
  • 对指定的目标生成一个子类,并覆盖其中方法实现增强,但因为采用的是继承,所以不能对final修饰的类进行代理。
  • 注意:jdk的动态代理只可以为接口去完成操作,而cglib它可以为没有实现接口的类去做代理,也可以为实现接口的类去做代理。

¶性能对比

关于两者之间的性能的话,JDK动态代理所创建的代理对象,在以前的JDK版本中,性能并不是很高,虽然在高版本中JDK动态代理对象的性能得到了很大的提升,但是他也并不是适用于所有的场景。主要体现在如下的两个指标中:

  • CGLib所创建的动态代理对象在实际运行时候的性能要比JDK动态代理高不少,有研究表明,大概要高10倍;
  • 但是CGLib在创建对象的时候所花费的时间却比JDK动态代理要多很多,有研究表明,大概有8倍的差距;
  • 对于singleton的代理对象或者具有实例池的代理,因为无需频繁的创建代理对象,所以比较适合采用CGLib动态代理,反正,则比较适用JDK动态代理。
¶查看 =>>> 具体测试结果

跳台阶

发表于 2019-03-15 | 更新于 2019-03-16 | 分类于 算法技巧 | 评论数: | 阅读次数:

¶题目描述

一只青蛙一次可以跳上1级台阶,也可以跳上2级。求该青蛙跳上一个n级的台阶总共有多少种跳法(先后次序不同算不同的结果)。

¶python 穷举法解二元一次方程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#3x+4y = 100 求 x = ? ,y = ?
x = 0
while x <= (100//3):
if(100 - 3*x) % 4 == 0:
y = (100 - 3*x) // 4
print("共有解:x的值是 %d,y的值是 %d"%(x,y))
x += 1

共有解:x的值是 0,y的值是 25
共有解:x的值是 4,y的值是 22
共有解:x的值是 8,y的值是 19
共有解:x的值是 12,y的值是 16
共有解:x的值是 16,y的值是 13
共有解:x的值是 20,y的值是 10
共有解:x的值是 24,y的值是 7
共有解:x的值是 28,y的值是 4
共有解:x的值是 32,y的值是 1
1
2


<i class="fa fa-angle-left" aria-label="上一页"></i>123…5<i class="fa fa-angle-right" aria-label="下一页"></i>
Jeff-Eric

Jeff-Eric

42 日志
12 分类
28 标签
<i class="fa fa-fw fa-github"></i>GitHub <i class="fa fa-fw fa-envelope"></i>E-Mail
<img src="/blog/images/cc-by-nc-sa.svg" alt="Creative Commons"/>
0%
© 2019 Jeff-Eric
由 Hexo 强力驱动 v4.0.0
|
主题 – NexT.Gemini v7.0.1