-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.html
592 lines (449 loc) · 101 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>BinaryTree's blog</title>
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1">
<meta property="og:type" content="website">
<meta property="og:title" content="BinaryTree's blog">
<meta property="og:url" content="http://yoursite.com/index.html">
<meta property="og:site_name" content="BinaryTree's blog">
<meta name="twitter:card" content="summary">
<meta name="twitter:title" content="BinaryTree's blog">
<link rel="alternate" href="/atom.xml" title="BinaryTree's blog" type="application/atom+xml">
<link rel="icon" href="/favicon.png">
<link href="//fonts.googleapis.com/css?family=Source+Code+Pro" rel="stylesheet" type="text/css">
<link rel="stylesheet" href="/css/style.css">
</head>
<body>
<div id="container">
<div id="wrap">
<header id="header">
<div id="banner"></div>
<div id="header-outer" class="outer">
<div id="header-title" class="inner">
<h1 id="logo-wrap">
<a href="/" id="logo">BinaryTree's blog</a>
</h1>
</div>
<div id="header-inner" class="inner">
<nav id="main-nav">
<a id="main-nav-toggle" class="nav-icon"></a>
<a class="main-nav-link" href="/">Home</a>
<a class="main-nav-link" href="/archives">Archives</a>
</nav>
<nav id="sub-nav">
<a id="nav-rss-link" class="nav-icon" href="/atom.xml" title="RSS Feed"></a>
<a id="nav-search-btn" class="nav-icon" title="Zoeken"></a>
</nav>
<div id="search-form-wrap">
<form action="//google.com/search" method="get" accept-charset="UTF-8" class="search-form"><input type="search" name="q" results="0" class="search-form-input" placeholder="Search"><button type="submit" class="search-form-submit"></button><input type="hidden" name="sitesearch" value="http://yoursite.com"></form>
</div>
</div>
</div>
</header>
<div class="outer">
<section id="main">
<article id="post-rxjava5" class="article article-type-post" itemscope itemprop="blogPost">
<div class="article-meta">
<a href="/2017/04/30/rxjava5/" class="article-date">
<time datetime="2017-04-30T15:34:25.000Z" itemprop="datePublished">2017-04-30</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="article-title" href="/2017/04/30/rxjava5/">拥抱RxJava(番外篇):关于RxJava的Tips & Tricks</a>
</h1>
</header>
<div class="article-entry" itemprop="articleBody">
<h2 id="前言:"><a href="#前言:" class="headerlink" title="前言:"></a>前言:</h2><blockquote>
<p>起初写 拥抱RxJava 系列文章。只是因为看到很多人在使用RxJava时候,并没有很正确的理解Reactive Programming。仅仅在项目中使用了Retrofit的Rx Adapter或者使用了一点点RxBus就写道自己的项目中用了RxJava,并以此传道。我觉得这样是不好的。所以写了这一系列更好的介绍RxJava。 但是可能个人的语言能力确实略差,每次都会让读者有些许误解。今天这篇我们就放松一下,分享一下我使用RxJava时的一些Tips & Tricks (本文在为指定背景的情况下,使用的是 RxJava 2.x 版本。 )</p>
</blockquote>
<p>这篇文章很多借鉴了这个presentation中的一些技巧: <a href="https://www.youtube.com/watch?v=QdmkXL7XikQ" target="_blank" rel="external">Common RxJava Mistakes (视频需要科学上网)
</a></p>
<h4 id="0-RxJava-不是网络请求库,不是线程库,RxJava是响应式编程在JVM上的一种实现方式!"><a href="#0-RxJava-不是网络请求库,不是线程库,RxJava是响应式编程在JVM上的一种实现方式!" class="headerlink" title="0 RxJava 不是网络请求库,不是线程库,RxJava是响应式编程在JVM上的一种实现方式!"></a>0 RxJava 不是网络请求库,不是线程库,RxJava是响应式编程在JVM上的一种实现方式!</h4><p>很多人使用RxJava仅仅是因为线程切换方便,或者是因为Retrofit提供了这样一个炫酷的返回方式,或者仅仅是因为RxJava这种链式调用很炫酷,又或是因为大家都用RxJava我不用怕跟不上节奏。<br>如果你使用RxJava仅仅是因为如上几个原因,我建议你放弃RxJava。因为RxJava给你带来的坑将远多于你从RxJava中获得的便利。曾经有一个老爷爷说过一句话:</p>
<blockquote>
<p>With Great Power Comes Great Responsibility<br>力量越大,责任越大</p>
</blockquote>
<p>RxJava能做的远不仅仅是切换线程或者简单的变换(<code>map()</code>,<code>filter()</code>等等)。所以当然我这篇不可能覆盖所有的 RxJava 的坑。如果你想使用RxJava,回头问一下自己使用这个库的初衷。</p>
<h4 id="1-observeOn-vs-subscribeOn"><a href="#1-observeOn-vs-subscribeOn" class="headerlink" title="1 observeOn vs subscribeOn"></a>1 observeOn vs subscribeOn</h4><p>这两个操作符可能对很多人是最常用的两个了。 然而这中间也有很多大坑在这里。</p>
<h5 id="1-1-subscribeOn-控制上游,observeOn控制下游"><a href="#1-1-subscribeOn-控制上游,observeOn控制下游" class="headerlink" title="1.1 subscribeOn 控制上游,observeOn控制下游"></a>1.1 subscribeOn 控制上游,observeOn控制下游</h5><p>很多人误以为 <code>subscribeOn</code> 控制的是Observable的生成的线程而<code>observeOn</code>控制的是 <code>subscribe()</code> 方法发生的线程。 当然,这点你们可以怪扔物线大神,毕竟他在他对RxJava最著名的帖子中写道:</p>
<blockquote>
<ul>
<li>subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。 ——扔物线</li>
</ul>
</blockquote>
<p>这个观点不能说是错的(这点我稍后再将),但是更多的是 subscribeOn控制整个上游,而observeOn控制整个下游。 举个例子:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div></pre></td><td class="code"><pre><div class="line">Observable.just("1","2","3")</div><div class="line"> .map(x -> x.length())</div><div class="line"> .subscribeOn(Schedulers.io())</div><div class="line"> .flatMap(x -> Observable.just(x,"pause"))</div><div class="line"> .observeOn(Schedulers.computation())</div><div class="line"> .map(x -> someHeavyCaculation(x))</div><div class="line"> .subscribe(x -> Log.d(TAG, x));</div></pre></td></tr></table></figure>
<p>这段代码中各个操作符是在哪些线程中进行的?<br>我们看下答案:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div></pre></td><td class="code"><pre><div class="line">Observable.just("1","2","3") //IO 线程</div><div class="line"> .map(x -> x.length()) //IO 线程</div><div class="line"> .subscribeOn(Schedulers.io())</div><div class="line"> .flatMap(x -> Observable.just(x,"pause")) //IO 线程</div><div class="line"> .observeOn(Schedulers.computation())</div><div class="line"> .map(x -> someHeavyCaculation(x)) //computation 线程</div><div class="line"> .subscribe(x -> Log.d(TAG, x)); //computation 线程</div></pre></td></tr></table></figure>
<p>所以我们看到了,<code>observeOn</code> 后面的所有操作都会在这个线程工作。<code>subscribeOn</code> 会从这个Observable生成一直到遇到其他 <code>observeOn</code>。所以 <code>observeOn</code> 和 <code>subscribeOn</code> 的位置非常关键。</p>
<p>当然,这点问题 扔物线大神在文章中也详细的讲到了:</p>
<blockquote>
<p>因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。 ——扔物线</p>
</blockquote>
<p>只不过很多人看帖子看一半,扔物线大神把这最重要的部分放到了<code>lift()</code>后面讲,很多人看完枯燥的<code>lift()</code>后,选择性忽视了最后关于 <code>Scheduler</code>非常关键的这部分。</p>
<h5 id="1-2-subscribeOn只发生一次,observeOn可以使用多次"><a href="#1-2-subscribeOn只发生一次,observeOn可以使用多次" class="headerlink" title="1.2 subscribeOn只发生一次,observeOn可以使用多次"></a>1.2 subscribeOn只发生一次,observeOn可以使用多次</h5><p>如果程序需要多次切换线程,使用多次<code>observeOn</code>是完全可以的。 而subscribeOn只有最上方的<code>subscribeOn</code>会起作用。这点扔物线大神的文章也补充过,大家可以回头再重温一下。</p>
<h5 id="1-3-不是所有操作符都会在默认线程执行"><a href="#1-3-不是所有操作符都会在默认线程执行" class="headerlink" title="1.3 不是所有操作符都会在默认线程执行"></a>1.3 不是所有操作符都会在默认线程执行</h5><p>很多操作符的默认执行线程并不是当前线程,这类操作符有一个特征就是会提供带有 Scheduler 参数的重载方法,比如 <code>interval</code>。 <code>interval</code> 会默认在computation线程执行,如果你在后面加上<code>subscribeOn</code>。 他还是会在computation线程执行,你只有在重载方法里加入其他 Scheduler,他才会在其他线程执行。如果你仔细看过 RxJava 的 JavaDoc。 他都会明确写出这个操作符的默认工作线程。</p>
<h4 id="2-如果可能,避免使用Subject-当然包括RxBus"><a href="#2-如果可能,避免使用Subject-当然包括RxBus" class="headerlink" title="2 如果可能,避免使用Subject 当然包括RxBus!!"></a>2 如果可能,避免使用Subject 当然包括RxBus!!</h4><p>subject 作为Observable的结合体,在使用时非常方便。但是在使用时,很多时候并不尽人意。</p>
<h5 id="2-1-Subject-的行为是不可预期的"><a href="#2-1-Subject-的行为是不可预期的" class="headerlink" title="2.1 Subject 的行为是不可预期的"></a>2.1 Subject 的行为是不可预期的</h5><p>Subject 由于暴露 <code>onNext</code> 方法。非常难控制。任何有这个subject引用的对象都可以使用这个方法传输数据,任何订阅了subject的人都可以接收到这个数据。<br>这导致你订阅Subject后几乎不清楚数据来源到底是谁。甚至也不知道你收到的到底是什么数据。这也是为什么我强烈抵制 RxBus 的一大原因。<br>Subject 由于自己是Observable, 他遵循Observable Contract。 如果其中某个事件出现异常,onError触发,那么这个Subject将再也不能使用。当然这点可以使用 Jake Wharton的 RxRelay来解决。 RxRelay就是一个没有onComplete和onError的Subject。<br>所以如果你的程序中必须使用Subject, 推荐将其设为 private field并且对外只暴露他的Observable形式。</p>
<h5 id="2-2-Subject-默认是Hot-Observable"><a href="#2-2-Subject-默认是Hot-Observable" class="headerlink" title="2.2 Subject 默认是Hot Observable"></a>2.2 Subject 默认是Hot Observable</h5><p>关于 hot/cold Observable我在这篇文章中详细的解释过: <a href="http://www.jianshu.com/p/f5f327c8b612" target="_blank" rel="external">拥抱RxJava(三):关于Observable的冷热,常见的封装方式以及误区</a></p>
<p>Subject默认是热的,也就是说你发送的信息接收者是否接受的到是不一定的。是需要根据情况分析的。具体可以看我关于hot/cold Observable 的文章。</p>
<h5 id="2-3-Again。-不要在继续使用RxBus了"><a href="#2-3-Again。-不要在继续使用RxBus了" class="headerlink" title="2.3 Again。 不要在继续使用RxBus了"></a>2.3 Again。 不要在继续使用RxBus了</h5><p>RxBus 几乎都是基于Subject的再次封装。使得他不仅拥有了Subject是所有缺点还加入了很多缺点,比如他不是类型安全的。 我见过太多的RxBus封装都只是一个接受Object类型的Subject。这个问题当然也有很多RxBus通过 键值对或<code>ofType()</code>等等操作解决。再比如RxBus更容易造成内存泄漏(因为需要将所有事件和订阅者存储在Subject中,)。更多欢迎再次看一下我的第一篇关于RxJava的文章: <a href="http://www.jianshu.com/p/61631134498e" target="_blank" rel="external">放弃RxBus,拥抱RxJava(一)</a></p>
<p>前几天我在Reddit上看到一个人的回复:</p>
<blockquote>
<p>I think EventBus on android is popular because people don’t know how to share a java object reference between android components like a Fragment and an Activity, or 2 Activities and so on. So basically I think people don’t know how 2 Activites can observe the same object for data changes which I think comes from the fact that we still don’t know how to architect our apps properly.</p>
<p>我认为 EventBus在Android上火爆的原因是人们不知道怎么去在Android组件,例如Activity/Fragment之间共享一个Java对象的引用。</p>
</blockquote>
<p>想一想,自己使用EventBus是不是也是这个原因呢?</p>
<h4 id="3-如果你还在使用RxJava-1-x-建议尽快升级2-x版本"><a href="#3-如果你还在使用RxJava-1-x-建议尽快升级2-x版本" class="headerlink" title="3 如果你还在使用RxJava 1.x 建议尽快升级2.x版本"></a>3 如果你还在使用RxJava 1.x 建议尽快升级2.x版本</h4><p>RxJava 2.x 更新了很多新内容。比如将Backpressure机制分离出来做成Flowable等等。 而且RxJava 1.x马上要寿终正寝,进入不再更新的模式(2017年6月)。所以还在使用RxJava 1.X 的同学们尽快更新吧。<br>如果你仍然处于某种原因,必须使用RxJava 1.x, 那么也千万不要使用<code>Observable.create(Observable.OnSubscribe<T> f)</code> 操作符(现已经被Deprecated)创建Observable。使用其他工厂方法或者直接升级为RxJava 2.x (现已经更新到2.1.0)才是正确的选择。</p>
<h4 id="4-关于操作符"><a href="#4-关于操作符" class="headerlink" title="4 关于操作符"></a>4 关于操作符</h4><h5 id="4-1-尽量避免过多的使用操作符,能合并的操作符尽量合并。"><a href="#4-1-尽量避免过多的使用操作符,能合并的操作符尽量合并。" class="headerlink" title="4.1 尽量避免过多的使用操作符,能合并的操作符尽量合并。"></a>4.1 尽量避免过多的使用操作符,能合并的操作符尽量合并。</h5><p>这里的合并不是指使用ObservableTransformer合并。而是指在逻辑上合并,比如:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div></pre></td><td class="code"><pre><div class="line">Observable.just("Hello","World","RxJava")</div><div class="line"> .map(x -> x.length())</div><div class="line"> .map(x -> x + 2)</div><div class="line"> .subscribe(/**********/)</div></pre></td></tr></table></figure>
<p>这里的两个<code>map</code>明显可以写成一个。 我们知道,每个操作符都会根据操作符的特性生成新的Observable,订阅他的上游然后给下游发送数据,避免使用过多的操作符可以降低内存抖动。<br>所以我不是很推荐使用ObservableTransformer来合并出来一个<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div></pre></td><td class="code"><pre><div class="line">ObservableTransformer applyThread = upstream -></div><div class="line"> upstream.subscribeOn(Schedulers.io())</div><div class="line"> .observeOn(AndroidSchedulers.mainThread());</div></pre></td></tr></table></figure></p>
<p>这样虽然在写法上更简单了。但是损失了observeOn的灵活性还额外增加OverHead。得不偿失。当然,如果我们使用Transformer来进行模块解耦,这当然是非常值得的。详细可以参考我的上一篇文章:<br><a href="http://www.jianshu.com/p/42d77c577ff4" target="_blank" rel="external">动手做一个Full Rx App</a></p>
<h5 id="4-2-flatMap并不保证发射顺序。"><a href="#4-2-flatMap并不保证发射顺序。" class="headerlink" title="4.2 flatMap并不保证发射顺序。"></a>4.2 flatMap并不保证发射顺序。</h5><p><code>flatMap</code>是将他每个得到的item转换成一个Observable,然后通过<code>merge</code>融合这些Observable。但是每个对应的Observable发射出去的一个或多个项目并不是完全有序的。如果想要保证发射顺序,使用<code>concatMap</code>。同理,<code>merge</code>操作符也不保证顺序,如果需要有序,使用<code>concat</code>。</p>
<h5 id="4-3-如果不是必要,不要在flatMap中使用过多的操作符。"><a href="#4-3-如果不是必要,不要在flatMap中使用过多的操作符。" class="headerlink" title="4.3 如果不是必要,不要在flatMap中使用过多的操作符。"></a>4.3 如果不是必要,不要在flatMap中使用过多的操作符。</h5><p>我们刚才说了,每个item都会生成一个新的Observable,每个操作符也会。所以如果你的flatMap中有其他操作符,比如下面的代码:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div></pre></td><td class="code"><pre><div class="line">Observable.fromIterable(list)</div><div class="line"> .flatMap(x -> Observable.just("x",x,"y")</div><div class="line"> .map(item -> "item" + item))</div><div class="line"> .subscribe();</div></pre></td></tr></table></figure></p>
<p>如果你的list中有上万个item。 那么你将会调用这个map上万次,多生成上万个ObservableMap来进行这个操作。 我们可以简单的消除这个OverHead。 将map拿出来,如下:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div></pre></td><td class="code"><pre><div class="line">Observable.fromIterable(list)</div><div class="line"> .flatMap(x -> Observable.just("x",x,"y"))</div><div class="line"> .map(item -> "item" + item)</div><div class="line"> .subscribe();</div></pre></td></tr></table></figure></p>
<p>这样<code>flatMap</code>后的Observable会统一进行管理。省去了那上万个ObservableMap。</p>
<h5 id="4-4-如果不是必要,不要自己写-Observable-的操作符。"><a href="#4-4-如果不是必要,不要自己写-Observable-的操作符。" class="headerlink" title="4.4 如果不是必要,不要自己写 Observable 的操作符。"></a>4.4 如果不是必要,不要自己写 Observable 的操作符。</h5><p>Observable的每个操作符都有着很复杂的逻辑,就连很多RxJava的专家都会出错。如果你真的想写自己的操作符,我建议你首先阅读这个文章:<br><a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0" target="_blank" rel="external">Writing operators for 2.0
</a><br>详细的介绍了如何写操作符,要遵顼哪些规则。 顺便一提,这个文章有将近1700行,还有三个主要模块处于TBD阶段,并没有完全补充。写操作符的难度可想而知。</p>
<h4 id="小彩蛋:RxJava和Kotlin我最近碰到的一个坑"><a href="#小彩蛋:RxJava和Kotlin我最近碰到的一个坑" class="headerlink" title="小彩蛋:RxJava和Kotlin我最近碰到的一个坑"></a>小彩蛋:RxJava和Kotlin我最近碰到的一个坑</h4><p>最近使用RxJava和Kotlin。 我将<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div></pre></td><td class="code"><pre><div class="line">startWith(idleState());</div></pre></td></tr></table></figure></p>
<p>写成了<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div></pre></td><td class="code"><pre><div class="line">startWith{idleState()};</div></pre></td></tr></table></figure></p>
<p>在我的IDE几乎肉眼难以分辨的区别。由于kotlin的lambda规则,<code>{}</code> 把我需要的变量解析成了lambda表达式,又由于正好是单参数的lambda,可以省略参数转换为 it 的写法。导致我这句仍然可以编译,但重载了错误的操作符。导致整个链条崩溃。那些年和我说kotlin语法更为安全的人你过来我给你加个BUFF!</p>
</div>
<footer class="article-footer">
<a data-url="http://yoursite.com/2017/04/30/rxjava5/" data-id="cj2524lpv000158w10txv3lzs" class="article-share-link">Delen</a>
</footer>
</div>
</article>
<article id="post-rxjava4" class="article article-type-post" itemscope itemprop="blogPost">
<div class="article-meta">
<a href="/2017/04/23/rxjava4/" class="article-date">
<time datetime="2017-04-23T16:50:04.000Z" itemprop="datePublished">2017-04-23</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="article-title" href="/2017/04/23/rxjava4/">拥抱RxJava(四):动手做一个Full Rx的 注册界面</a>
</h1>
</header>
<div class="article-entry" itemprop="articleBody">
<h3 id="背景-Background"><a href="#背景-Background" class="headerlink" title="背景/Background"></a>背景/Background</h3><p>前阵子不久,Jake Wharton 在Devoxx 的演讲: <a href="http://jakewharton.com/the-state-of-managing-state-with-rxjava/" target="_blank" rel="external">The State of Managing State with RxJava</a> 中提出了一个类似于Redux的 Full Rx 的App 结构。 如下图:</p>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-733a8cebf527bd65.PNG?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt="Redux"></p>
<p>整个结构全部由RxJava控制 state。 和传统MVX结构类似,也是大致分UI层(View),中间层(Presenter/ViewModel/Controller或者我更喜欢叫Translator)和数据层(Model)。大致流程如下:</p>
<ol>
<li>Ui层(View)层将用户输入数据打包成UiEvent传递给中间层</li>
<li>中间层(Translator)将Event处理成对应的Action交给数据处理层。</li>
<li>处理结果打包成对应的Result交还给Translator</li>
<li>Translator将数据结果打包成对应的UiModel交换给Ui做对应的Ui显示。</li>
</ol>
<h3 id="实现-Demo"><a href="#实现-Demo" class="headerlink" title="实现/Demo"></a>实现/Demo</h3><p>我们先一步一步写个Demo看下这个结构的优缺点吧!<br>为了方便,我直接使用Android Studio提供的LoginActivity模板。<br>我们的目的是要做一个注册界面,为了简化只有用户名,密码。首先我们来定义Event:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div></pre></td><td class="code"><pre><div class="line">public class AuthEvent {</div><div class="line"> public final static class SignUpEvent extends AuthEvent {</div><div class="line"> private final String username;</div><div class="line"> private final String password;</div><div class="line"></div><div class="line"> public SignUpEvent(String username, String password) {</div><div class="line"> this.username = username;</div><div class="line"> this.password = password;</div><div class="line"> }</div><div class="line"> //... getters</div><div class="line"> }</div><div class="line"></div><div class="line">}</div></pre></td></tr></table></figure>
<p>这里SignUpEvent继承自AuthEvent是为了统一逻辑。这样我们可以在一整条stream里实现我们所有的逻辑。<br>我们在Ui层将这个Event打包(这里我使用RxBinding):<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div></pre></td><td class="code"><pre><div class="line">Observable<SignUpEvent> click = RxView.clicks(mEmailSignInButton)</div><div class="line"> .map(ignore -> new SignUpEvent(mEmailView.getText().toString(),</div><div class="line"> mPasswordView.getText().toString()));</div></pre></td></tr></table></figure></p>
<p>这样我们每次点击按钮就会发射一个SignUpEvent出来。</p>
<p>再来我们定义我们的UiModel,我们首先要想好,我们的Ui到底有几种状态,我们将各种状态提前定义。我大致觉得我们需要四种状态:</p>
<ol>
<li>idle 初始状态,就是用户第一次进入的状态</li>
<li>inProcess 状态,也就Ui界面等待注册是否成功的状态</li>
<li>success 状态,注册成功 进行下一步操作</li>
<li>fail 状态,注册失败,返回失败信息。</li>
</ol>
<p>根据这四种状态,我们来定义UiModel:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div><div class="line">23</div><div class="line">24</div><div class="line">25</div><div class="line">26</div><div class="line">27</div><div class="line">28</div><div class="line">29</div><div class="line">30</div><div class="line">31</div><div class="line">32</div><div class="line">33</div></pre></td><td class="code"><pre><div class="line"></div><div class="line">public class AuthUiModel {</div><div class="line"> private final boolean inProcess;</div><div class="line"> private final boolean usrValidate;</div><div class="line"> private final boolean pwdValidate;</div><div class="line"> private final boolean success;</div><div class="line"> private final String errorMessage;</div><div class="line"></div><div class="line"> private AuthUiModel(boolean inProcess, boolean usrValidate, boolean pwdValidate, boolean success, String errorMessage) {</div><div class="line"> this.inProcess = inProcess;</div><div class="line"> this.usrValidate = usrValidate;</div><div class="line"> this.pwdValidate = pwdValidate;</div><div class="line"> this.success = success;</div><div class="line"> this.errorMessage = errorMessage;</div><div class="line"> }</div><div class="line"></div><div class="line"> public static AuthUiModel idle() {</div><div class="line"> return new AuthUiModel(false, true, true, false, "");</div><div class="line"> }</div><div class="line"></div><div class="line"> public static AuthUiModel inProcess() {</div><div class="line"> return new AuthUiModel(true, true, true, false, "");</div><div class="line"> }</div><div class="line"></div><div class="line"> public static AuthUiModel success() {</div><div class="line"> return new AuthUiModel(false, true, true, true, "");</div><div class="line"> }</div><div class="line"></div><div class="line"> public static AuthUiModel fail(boolean username, boolean password, String msg) {</div><div class="line"> return new AuthUiModel(false, username, password, false, msg);</div><div class="line"> }</div><div class="line"> //... getters</div><div class="line">}</div></pre></td></tr></table></figure>
<p>再来是Model层,我们这里用一个简单的AuthManager来管理,解耦出来后这里可以替换成任意你喜欢的注册方式:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div></pre></td><td class="code"><pre><div class="line">public class AuthManager {</div><div class="line"> private SignUpResult result;</div><div class="line"> private Observable<SignUpResult> observable = Observable.fromCallable(() -> result)</div><div class="line"> //延迟2s发送结果,模拟网络请求延迟</div><div class="line"> .delay(2000, TimeUnit.MILLISECONDS);</div><div class="line"></div><div class="line"> public Observable<AuthResult.SignUpResult> signUp(SignUpAction action) {</div><div class="line"> //检查用户名是否合法</div><div class="line"> if (TextUtils.isEmpty(action.getUsername()) || !action.getUsername().contains("@")) {</div><div class="line"> result = SignUpResult.FAIL_USERNAME;</div><div class="line"> }</div><div class="line"> //检查密码合法</div><div class="line"> else if (TextUtils.isEmpty(action.getPassword()) || action.getPassword().length() < 9) {</div><div class="line"> result = SignUpResult.FAIL_PASSWORD;</div><div class="line"> } else {</div><div class="line"> //检查结束,返回注册成功的信息</div><div class="line"> // TODO: createUser</div><div class="line"> result = SignUpResult.SUCCESS;</div><div class="line"> }</div><div class="line"> return observable;</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>这里SignUpAction里定义了我们注册所有需要的信息,代码和SignUpEvent几乎雷同。但是分离的好处是可以对数据进行在处理或者合并打包等等。</p>
<p>Ui和Model都准备好了,我们开始我们的Translator部分。 Translator部分主要又ObservableTransformer组成。 将各个部件组装,具体如下:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div><div class="line">23</div></pre></td><td class="code"><pre><div class="line">public final ObservableTransformer<SignUpEvent, AuthUiModel> signUp</div><div class="line"> //上游是UiEvent,封装成对应的Action</div><div class="line"> = observable -> observable.map(event -> new SignUpAction(event.getUsername(), event.getPassword()))</div><div class="line"> //使用FlatMap转向,进行注册</div><div class="line"> .flatMap(action -> authManager.signUp(action)</div><div class="line"> //扫描结果</div><div class="line"> .map(signUpResult -> {</div><div class="line"> if (signUpResult == SignUpResult.FAIL_USERNAME) {</div><div class="line"> return AuthUiModel.fail(false, true, "Username error");</div><div class="line"> }</div><div class="line"> if (signUpResult == SignUpResult.FAIL_PASSWORD) {</div><div class="line"> return AuthUiModel.fail(true, false, "Password error");</div><div class="line"> }</div><div class="line"> if (signUpResult == SignUpResult.SUCCESS) {</div><div class="line"> return AuthUiModel.success();</div><div class="line"> }</div><div class="line"> //TODO Handle error</div><div class="line"> throw new IllegalArgumentException("Unknown Result");</div><div class="line"> })</div><div class="line"> //设置初始状态为loading。</div><div class="line"> .startWith(AuthUiModel.inProcess())</div><div class="line"> //设置错误状态为error,防止触发onError() 造成断流</div><div class="line"> .onErrorReturn(error -> AuthUiModel.fail(true, true, error.getMessage())));</div></pre></td></tr></table></figure></p>
<p>这样我们在Activity里 将各个部分通过Translator组装:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div></pre></td><td class="code"><pre><div class="line">disposables.add(click.compose(translator.signUp)</div><div class="line"> .observeOn(AndroidSchedulers.mainThread())</div><div class="line"> .subscribe(authUiModel -> {</div><div class="line"> //载入进度条</div><div class="line"> mProgressView.setVisibility(authUiModel.isInProcess() ? View.VISIBLE : View.GONE);</div><div class="line"> //判断用户名/密码是否合法</div><div class="line"> if (!authUiModel.isPwdValidate()) {</div><div class="line"> mPasswordView.setError(authUiModel.getErrorMessage());</div><div class="line"> } else {</div><div class="line"> mPasswordView.setError(null);</div><div class="line"> }</div><div class="line"> if (!authUiModel.isUsrValidate()) {</div><div class="line"> mEmailView.setError(authUiModel.getErrorMessage());</div><div class="line"> } else {</div><div class="line"> mEmailView.setError(null);</div><div class="line"> }</div><div class="line"> //是否成功</div><div class="line"> if (authUiModel.isSuccess()) {</div><div class="line"> Toast.makeText(this, "CreateUser SuccessFull", Toast.LENGTH_SHORT)</div><div class="line"> .show();</div><div class="line"> }</div><div class="line"> }));</div></pre></td></tr></table></figure></p>
<p>很明显的看到,在Activity中 只有Ui相关的处理,而中间的逻辑通过translator解耦出来,对Activity不可见。</p>
<h3 id="问题-Issues"><a href="#问题-Issues" class="headerlink" title="问题/Issues"></a>问题/Issues</h3><p>但是,问题来了。这里些许Bug.由于我们使用Transformer. 每次转屏的时候会通过RxView来生成新的Observable.这样我们的translator并没有复用,还是绑定在了生命周期上。那么如何解决?</p>
<p>我们设想一下,如果中间的Translator可以随时接受下游的订阅而且无论下游是否有订阅,他都可以一直运行,这样不就在下游彻底解耦了吗?这种特性的Observable我在上一篇文章中说到是ConnectableObservable。这里我们使用Replay(1)。这样我们就每次重新订阅,也会获得最近的一次UiModel,再也不用担心转屏/内存重启。</p>
<p>下游解决了,那上游呢?如果上游每次调用这个Transformer,每次还是一个新的Observable啊。理想的情况应该是我们有一个中间人,他不断接受Ui层传过来的UiEvent然后交给我们Transformer, 这样我们就能一直复用我们的Transformer。也就是他既作为一个Observer订阅上游UiEvent又作为一个Observable,给下游传递数据。那么答案呼之欲出,我们需要一个Subject作为中间人。<br>改善后的Translator代码如下:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div><div class="line">23</div><div class="line">24</div><div class="line">25</div><div class="line">26</div><div class="line">27</div><div class="line">28</div><div class="line">29</div><div class="line">30</div><div class="line">31</div><div class="line">32</div><div class="line">33</div><div class="line">34</div><div class="line">35</div><div class="line">36</div><div class="line">37</div><div class="line">38</div><div class="line">39</div><div class="line">40</div></pre></td><td class="code"><pre><div class="line">public class AuthTranslator {</div><div class="line"> private AuthManager authManager;</div><div class="line"> private Subject<SignUpEvent> middle = PublishSubject.create();</div><div class="line"> private Observable<AuthUiModel> authUiModelObservable</div><div class="line"> = middle.map(event -> new SignUpAction(event.getUsername(), event.getPassword()))</div><div class="line"> //使用FlatMap转向,进行注册</div><div class="line"> .flatMap(action -> authManager.signUp(action)</div><div class="line"> //扫描结果</div><div class="line"> .map(signUpResult -> {</div><div class="line"> if (signUpResult == SignUpResult.FAIL_USERNAME) {</div><div class="line"> return AuthUiModel.fail(false, true, "Username error");</div><div class="line"> }</div><div class="line"> if (signUpResult == SignUpResult.FAIL_PASSWORD) {</div><div class="line"> return AuthUiModel.fail(true, false, "Password error");</div><div class="line"> }</div><div class="line"> if (signUpResult == SignUpResult.SUCCESS) {</div><div class="line"> return AuthUiModel.success();</div><div class="line"> }</div><div class="line"> //TODO Handle error</div><div class="line"> throw new IllegalArgumentException("Unknown Result");</div><div class="line"> })</div><div class="line"> //设置初始状态为loading。</div><div class="line"> .startWith(AuthUiModel.inProcess())</div><div class="line"> //设置错误状态为error,防止触发onError() 造成断流</div><div class="line"> .onErrorReturn(error -> AuthUiModel.fail(true, true, error.getMessage())))</div><div class="line"> .replay(1)</div><div class="line"> .autoConnect();</div><div class="line"></div><div class="line"> public final ObservableTransformer<SignUpEvent, AuthUiModel> signUp</div><div class="line"> //上游是UiEvent,封装成对应的Action</div><div class="line"> = observable -> {</div><div class="line"> //中间人切换监听</div><div class="line"> observable.subscribe(middle);</div><div class="line"> return authUiModelObservable;</div><div class="line"> };</div><div class="line"></div><div class="line"> public AuthTranslator(AuthManager authManager) {</div><div class="line"> this.authManager = authManager;</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure>
<p>这样我们刚才说的两个Bug就解决了。而且即使我们在请求中转屏,也毫无问题。</p>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-b1cc3bcbd41d469e.gif?imageMogr2/auto-orient/strip" alt=""></p>
<h3 id="总结"><a href="#总结" class="headerlink" title="总结"></a>总结</h3><p>实践一下这个结构确实有很多优点。</p>
<ol>
<li>将一整条state stream解耦分成几块,但又保持了一整条的结构。</li>
<li>相比传统MVX模式,多次控制翻转(Ioc),解耦更彻底</li>
<li>由于RxJava强大的操作符群。可以实现很多意想不到的功能</li>
</ol>
<p>缺点也蛮明显:</p>
<ol>
<li>我个人对这个架构理解也不是特别深入,中间的middle部分虽然用Subject 但是确实有其不稳定性,比如onError/onComplete会停止这个Subject造成断流</li>
<li>由于解耦彻底,造成需要很多辅助类,茫茫多的boilerplate。 不过这个在kotlin上有很好的发挥,sealed class,when 等语法几乎是为其量身定做。</li>
<li>难,真的难。比传统MVP,甚至MVVM需要更清晰,更合理的设计。不提前想好use case就开始写几乎是不可能的。而且RxJava如果不熟悉,调试起来确实很难。经常不能定位到代码。最好做单元测试各个模块。</li>
</ol>
<p>最后附上这个Demo 的GitHub Repo: <a href="https://github.com/wbinarytree/RxAuthDemo" target="_blank" rel="external">RxAuthDemo</a></p>
</div>
<footer class="article-footer">
<a data-url="http://yoursite.com/2017/04/23/rxjava4/" data-id="cj2524lpp000058w1t20usjj3" class="article-share-link">Delen</a>
</footer>
</div>
</article>
<article id="post-rxjava-3" class="article article-type-post" itemscope itemprop="blogPost">
<div class="article-meta">
<a href="/2017/04/09/rxjava-3/" class="article-date">
<time datetime="2017-04-09T16:36:52.000Z" itemprop="datePublished">2017-04-09</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="article-title" href="/2017/04/09/rxjava-3/">拥抱RxJava(三):关于Observable的冷热,常见的封装方式以及误区</a>
</h1>
</header>
<div class="article-entry" itemprop="articleBody">
<p>前两篇文章 <a href="http://www.jianshu.com/p/61631134498e" target="_blank" rel="external">放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus</a> ,<a href="http://www.jianshu.com/p/d2df6bceeff9" target="_blank" rel="external">放弃RxBus,拥抱RxJava(二):Observable究竟如何封装数据?</a> 写了一堆理论。看起来并没有什么实际用处,那么今天。我们实战一下,来封装我们需要的数据,并且了解一下各种方式具体的区别。</p>
<blockquote>
<p>前言: 很多朋友误会我文章的意思。我写这个系列文章的意思主要是帮助了解一下RxJava的常见用法。而不是使用一下自己或别人封装好的RxBus就觉得自己的项目使用RxJava了。但是这也仅仅是个人口味问题,很多情况下确实RxBus/EventBus会很方便,很刺激,很上瘾。所以从这篇文章开始,我把标题中的”放弃RxBus”去除。</p>
</blockquote>
<p>无论在简书,微信平台,GitHub等等分享平台。一个名字上写着 “MVP(MVVM) + RxJava + Retrofit + Dagger2 + ……..”这样的名字,再熟悉不过了。然而,大多数情况进去看一下RxJava部分。要么就是简单的把取到的数据用<code>Observable.just()</code>直接传给下一层,要么就是直接使用Retrofit的Adapter来直接获得Observable,而app中其他部分并没有reactive。而且还有很多Observable用法错误,比如冷热不分,连续太多的Map/FlatMap等等。</p>
<h4 id="0-RxBus-Retrofit-足够用了,我为什么要让自己的App-更加的Reactive"><a href="#0-RxBus-Retrofit-足够用了,我为什么要让自己的App-更加的Reactive" class="headerlink" title="0. RxBus/Retrofit 足够用了,我为什么要让自己的App 更加的Reactive?"></a>0. RxBus/Retrofit 足够用了,我为什么要让自己的App 更加的Reactive?</h4><p>为什么不用RxBus我已经写了两篇文章了,可能由于我不常写文,很多人并没有理解。在这里我再解释一次:EventBus如果是一辆穿梭在所有代码之间的公交车。那么Observable就是穿梭在少许人之间的Uber专车。他比起EventBus有很多优势,比如异常处理,线程切换,强大的操作符等等。你当然可以做出一辆超级Uber来当全局公交车(RxBus)使用,然而这却损失了RxJava本来的许多优势,并且又给自己挖了许多坑。</p>
<h5 id="0-1-一个常见误区,过多的operator"><a href="#0-1-一个常见误区,过多的operator" class="headerlink" title="0.1 一个常见误区,过多的operator"></a>0.1 一个常见误区,过多的operator</h5><p>刚开始使用RxJava的时候,我们会觉得operator的链式调用会非常的爽,一个简单的例子:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div></pre></td><td class="code"><pre><div class="line">Observable.just("1", "2", "3", "4", "5", "6", "7")</div><div class="line"> .map(x -> Integer.valueOf(x))</div><div class="line"> .map(x -> x * 2)</div><div class="line"> .map(x -> x + 4)</div><div class="line"> .filter(x -> x >2)</div><div class="line"> // and much more operators</div><div class="line"> .subscribeOn(Schedulers.io())</div><div class="line"> .observeOn(AndroidSchedulers.mainThread());</div></pre></td></tr></table></figure>
<p>当你只有很少数据的时候,这样当然可以,但是你数据量上来的时候,这就会有很多的overhead。 其实几乎所有的operator都会给你生成一个新的Observable。所以在上面这个例子中,我们在过程中生成了至少7个Observable。然而我们完全可以将中间的.map().map().map().filter合并在一个FlatMap中,减少很多的overhead。</p>
<h4 id="1-Observable-just-的局限性。"><a href="#1-Observable-just-的局限性。" class="headerlink" title="1. Observable.just()的局限性。"></a>1. Observable.just()的局限性。</h4><ol>
<li>使用<code>Observable.just()</code> 即使你没有调用subscribe方法。just()括号里面的代码也已经执行了。显然,<code>Observable.just()</code>不适合封装网络数据,因为我们通常不想在subscribe之前做网络请求。<br>举个例子:</li>
</ol>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div></pre></td><td class="code"><pre><div class="line">class TestClass{</div><div class="line"> TestClass(){</div><div class="line"> System.out.println("I'm created!");</div><div class="line"> }</div><div class="line">}</div><div class="line">Observable.just(new TestClass());</div></pre></td></tr></table></figure>
<p>这时你运行代码,你就看到确实你的TestClass 已经被创建了:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div></pre></td><td class="code"><pre><div class="line">I/System.out: I'm created!</div></pre></td></tr></table></figure>
<p> 当然,这个可以简单的用<code>defer()</code>/<code>fromCallable()</code>/<code>create()</code>操作符来是实现只有subscribe只有才加载。<br> 比如:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div></pre></td><td class="code"><pre><div class="line">// use fromCallable</div><div class="line">Observable.fromCallable(TestClass::new);</div><div class="line">//or</div><div class="line">Observable.defer(() -> Observable.just(new TestClass()));</div></pre></td></tr></table></figure>
<ol>
<li>Observable.just()不够灵活。虽然说设计模式上我们追求 “Minimize Mutability” 但是如果我们的程序越来越 reactive的时候。一个 ObservableJust 往往是不满足需求的。比如之前一定订阅的subscriber。如果数据更新了,你不可以同过ObservableJust 来通知所有的Observable 新数据更新了,需要你的subscriber主动更新。这显然有悖于我们追求的reactive programming。 主动pull数据而不是数据告诉你,我更新了然后再做出反应。</li>
</ol>
<p>当然ObservableJust在很多情况下,确实不错。如果你不需要监听后续的更新,那么ObservableJust可以满足你的需求。</p>
<h4 id="2-Hot-Observable-和-cold-Observable"><a href="#2-Hot-Observable-和-cold-Observable" class="headerlink" title="2. Hot Observable 和 cold Observable"></a>2. Hot Observable 和 cold Observable</h4><p>很多人在封装数据的时候,并没有太多考虑冷热的问题,通常情况下并不会出错。因为目前很多开源项目(Demo)里除了RxBus,并没有太多的RxJava的实时情况。然而,当你的App越来越Reactive的时候,冷热便是一个必须考虑的问题。<br>Hot Observable 意思是如果他开始传输数据,你不主动喊停(<code>dispose()</code>/<code>cancel()</code>),那么他就不会停,一直发射数据,即使他已经没有Subscriber了。而Cold Observable则是subscribe时才会发射数据。<br>然而,问题来了。我上篇文章讲过,只有subscribeActual方法调用了的时候,Observable发射数据,那为什么Hot Observable没有Subscriber也会发射数据,他把数据发射给谁了呢?我们在解决这个问题之前,先看一下Cold Observable:</p>
<h5 id="2-1-Cold-Observable"><a href="#2-1-Cold-Observable" class="headerlink" title="2.1 Cold Observable"></a>2.1 Cold Observable</h5><p>我们常见的工厂方法提供的都是ColdObservable,包括<code>just()</code>,<code>fromXX</code>,<code>create()</code>,<code>interval()</code>,<code>defer()</code>。 他们的共同点是当你有多个Subscriber的时候,他们的事件是独立的,举个例子:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div></pre></td><td class="code"><pre><div class="line">Observable interval = Observable.interval(1,TimeUnit.SECONDS);</div></pre></td></tr></table></figure>
<p>如果我们有两个subscriber,那么他们会各自有自己的计时器,并且互不干扰。效果如下图:</p>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-72f4c48a5108c4d7.gif?imageMogr2/auto-orient/strip" alt=""></p>
<h5 id="2-2-Hot-Observable"><a href="#2-2-Hot-Observable" class="headerlink" title="2.2 Hot Observable"></a>2.2 Hot Observable</h5><p>不同于Cold Observable, Hot Observable是共享数据的。对于Hot Observable的所有subscriber,他们会在同一时刻收到相同的数据。我们通常使用<code>publish()</code>操作符来将ColdObservable变为Hot。或者我们在RxBus中常常用到的<code>Subjects</code> 也是Hot Observable。<br>刚刚我们刚刚提出了一个问题,</p>
<blockquote>
<p>既然Hot Observable在没有subscriber的时候,还会继续发送数据,那么数据究竟发给谁了呢?</p>
</blockquote>
<p>其实Hot Observable其实并没有发送数据,而是他上层的Observable 发送数据给这个hot Observable。不信?我们来分别看一下:</p>
<h6 id="2-2-1-ConnectableObservable"><a href="#2-2-1-ConnectableObservable" class="headerlink" title="2.2.1 ConnectableObservable"></a>2.2.1 ConnectableObservable</h6><p>我们在上面的误区中知道了,几乎所有operator都会生成一个新的Observable。publish当然不例外。但是有区别的是,publish会给你一个ConnectableObservable。具体实现类是ObservablePublish。这个Observable的区别是他提供一个<code>connect()</code>方法,如果你调用<code>connect()</code>方法,ConnectableObservable就会开始接收上游Observable的数据。我们来测试一下:</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div></pre></td><td class="code"><pre><div class="line">ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();</div><div class="line">//connect even when no subscribers</div><div class="line">interval.connect();</div></pre></td></tr></table></figure>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-144ff80f8b1010d9.gif?imageMogr2/auto-orient/strip" alt=""></p>
<p>果然,由于我们subscribe晚了一些。0这个数据没有收到,当我们两个 <code>Subscriber</code> <code>都dispose的时候,ConnectableObservable</code> 也仍在接受数据,导致我们6这个数据没有接收到。<br><code>ConnectableObservable</code> 其实在内部,有一个<code>PublishObserver</code>,他有两个作用。一个是当我们调用 <code>connect()</code>方法时, <code>PublishObserver</code>开始接受上游的数据,我们的例子里便是 <code>Observable.interval(1, TimeUnit.SECONDS)</code> 。所以才能在我们没有调用 <code>subscribe</code>方法时,他也能开始发送数据。第二个作用是 <code>PublishObserver</code>存储所有的下游Subscriber, 也就是我们例子中的Subscriber1 和Subscriber2,在 <code>PublishObserver</code> 每次接到一个上游数据,就会将接收到的结果,依次分发给他存储的所有 <code>Subscribers</code> ,如果下游 <code>Subscriber</code> 调用了 <code>dispose</code>方法,那么他就会在自己的缓存中删除这个 Subscriber,下次接受到上游数据便不会传给这个<code>Subscriber</code>。<br>那么这时候,有同学应该要问了:</p>
<blockquote>
<p>我们可不可以停止从上游接受数据?</p>
</blockquote>
<p>我们当然可以。但是从设计的角度,RxJava为了提供链式调用。 <code>connect()</code>方法会返回一个 Disposable 给我们来控制是否继续接受上游的数据。</p>
<h5 id="2-2-2-ConnectableObservable的常用操作符"><a href="#2-2-2-ConnectableObservable的常用操作符" class="headerlink" title="2.2.2 ConnectableObservable的常用操作符"></a>2.2.2 ConnectableObservable的常用操作符</h5><p>我们当然不希望每次都手动控制 <code>ConnectableObservable</code>的开关。RxJava给我们提供了一些常用的控制操作符</p>
<ol>
<li><p>refCount()<br><code>refCount()</code>可以说是最常用的操作符了。他会把 <code>ConnectableObservable</code>变为一个通常的Observable但又保持了HotObservable的特性。也就是说,如果出现第一个Subscriber,他就会自动调用 <code>connect()</code>方法,如果他开始接受之后,下游的 <code>Subscribers</code>全部dispose,那么他也会停止接受上游的数据。具体看图:<br><img src="http://upload-images.jianshu.io/upload_images/2417399-4d400a8bf1ffa3ea.gif?imageMogr2/auto-orient/strip" alt=""></p>
<p>每个 <code>Subscriber</code> 每次都会接受同样的数据,但是当所有 <code>subscriber</code> 都 dispose时候,他也会自动dipose上游的 <code>Observable</code> 。所以我们重新subscribe的时候,又重新从0开始。<br>这个操作符常用到,RxJava将他和publish合并为一个操作符 :<code>share()</code>。</p>
</li>
<li><p>autoConnect()<br><code>autoConnect()</code>看名字就知道,他会自动链接,如果你单纯调用 <code>autoConnect()</code> ,那么,他会在你链接第一个 <code>Subscriber</code> 的时候调用 <code>connect()</code>,或者你调用 <code>autoConnect(int Num)</code>,那么他将会再收到Num个 <code>subscriber</code>的时候链接。<br>但是,这个操作符的关键在于,由于我们为了链式调用,autoConnect会返回Observable给你,你不会在返回方法里获得一个 <code>Disposable</code>来控制上游的开关。 不过没问题,autoConnect提供了另一种重载方法 :<br><code>autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)</code><br>他会在这个 <code>Consumer</code>传给你 你需要的那个总开关。而且,autoConnect并不会autoDisconnect, 也就是如果他即使没有subscriber了。他也会继续接受数据。</p>
</li>
<li>replay()<br><code>replay()</code>方法和 <code>publish()</code>一样,会返回一个 <code>ConnectableObservable</code>,区别是, <code>replay()</code>会为新的subscriber重放他之前所收到的上游数据,我们再来举个例子:</li>
</ol>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div></pre></td><td class="code"><pre><div class="line"> //only replay 3 values</div><div class="line">Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();</div></pre></td></tr></table></figure>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-c0b0fde9b0c67894.gif?imageMogr2/auto-orient/strip" alt=""></p>
<p>果然,Subscriber2在subscribe时候,立即收到了之前已经错过的三个数据,然后继续接受后面的数据。<br>但是,这里有几点需要考虑:replay() 会缓存上游发过来的数据,所以并不需要担心重新生成新数据给新的 Subscriber。</p>
<ol>
<li>ReplayingShare()<br>其实ReplayingShare并不能算是ConnectableObservable的一个操作符,他是JakeWhaton的一个开源库,只有百来行。实现的功能是几乎和<code>replay(1).refCount()</code>差不多。但是如果中断 Conncection之后,重新开始subscribe,他仍然会给你一个重放他上一次的结果。 具体看图:<br><img src="http://upload-images.jianshu.io/upload_images/2417399-5de2bcd0d3aca667.gif?imageMogr2/auto-orient/strip" alt=""><br>我们看到和刚才的replay不同,即使两个Subscriber都 dispose, 重新开始仍然会接收到我们缓存过的一个数据。</li>
</ol>
<h5 id="2-3-Subjects"><a href="#2-3-Subjects" class="headerlink" title="2.3 Subjects"></a>2.3 Subjects</h5><p>Subjects 作为一个Reactive世界中的特殊存在,他特殊在于他自己既是一个Observable又是一个Observer(Subscriber)。你既可以像普通Observable一样让别的Subscriber来订阅,也可以用Subjects来订阅别人。更方便的是他甚至暴露了OnXX(),方法给你。你直接调用可以通知所有的Subscriber。 这也是RxBus的基础,RxBus几乎离不开Subjects。 蜘蛛侠的老爹告诉我们,力量越大,责任就也大。Subjects也一样。 Subjects因为暴露了OnXX()方法,使得Subjects的数据来源变得难以控制。而且,Subjects一直是HotObservable,我们来看下Subject的<code>OnNext()</code>方法的实现:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div></pre></td><td class="code"><pre><div class="line">@Override</div><div class="line">public void onNext(T t) {</div><div class="line"> if (subscribers.get() == TERMINATED) {</div><div class="line"> return;</div><div class="line"> }</div><div class="line"> if (t == null) {</div><div class="line"> onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));</div><div class="line"> return;</div><div class="line"> }</div><div class="line"> for (PublishDisposable<T> s : subscribers.get()) {</div><div class="line"> s.onNext(t);</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>可以看出来Subjects只要调用了<code>OnNext()</code>方法就会立即发送数据。所以,使用时一定要注意Subjects和Subscriber的链接时序问题。具体Subjects的用法我想介绍帖子已经足够多了。这里就不赘述了。</p>
<h4 id="3-在Android中常见的几种封装和注意事项"><a href="#3-在Android中常见的几种封装和注意事项" class="headerlink" title="3. 在Android中常见的几种封装和注意事项"></a>3. 在Android中常见的几种封装和注意事项</h4><h5 id="1-封装View-的Listener"><a href="#1-封装View-的Listener" class="headerlink" title="1.封装View 的Listener"></a>1.封装View 的Listener</h5><p>View 的各种Listener 我们常用create方法来封装,比如OnClickListener:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div></pre></td><td class="code"><pre><div class="line">Observable.create(emitter -> {</div><div class="line"> button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));</div><div class="line"> emitter.setCancellable(() -> button.setOnClickListener(null));</div><div class="line">});</div></pre></td></tr></table></figure></p>
<p>这里非常关键的一点是一定要设置解除绑定,否则你将持续使用这个会造成内存泄漏。而且最好配合使用share()。否则只有最后一个Subscriber能收到OnClick。当然,如果不考虑方法数的话,推荐配合使用RxBinding。</p>
<p>而且,用create()方法封装Listener适合几乎所有的callback, 并且安全。</p>
<h5 id="2-封装简单的数据源"><a href="#2-封装简单的数据源" class="headerlink" title="2.封装简单的数据源"></a>2.封装简单的数据源</h5><p>设想一个场景,我们有一个User类。里面有我们的用户名,头像,各种信息。然而在我们的app中,可能有三四个Fragment/Activity需要根据这个User做出不同的反应。这时我们就可以简单的使用Subject来封装User类。</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div></pre></td><td class="code"><pre><div class="line">public class UserRepository {</div><div class="line"> private User actualUser;</div><div class="line"></div><div class="line"> private Subject<User> subject = ReplaySubject.createWithSize(1);</div><div class="line"></div><div class="line"> /**</div><div class="line"> *</div><div class="line"> *Get User Data from wherever you want Network/Database etc</div><div class="line"> */</div><div class="line"></div><div class="line"> public Observable<User> getUpdate(){</div><div class="line"> return subject;</div><div class="line"> }</div><div class="line"></div><div class="line"> public void updateUser(User user){</div><div class="line"> actualUser = user;</div><div class="line"> subject.onNext(actualUser);</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure>
<p>如果我们某些模块需要这个User,那么只需要subscribe到这个Repository,如果User有更新,每一个Subscriber都会收到更新后的User并且互相不影响。而且我们使用ReplaySubject,即使有新的Subscriber,也会收到最新的一个Subject。<br>但是使用的时候一定要注意,因为用的是Subject.所以在onNext方法中一旦出现了error。那么所有的Subscriber都将和这个subject断开了链接。这里也可以用RxRelay代替Subject,简单来说Relay就是一个没有onError和onComplete的Subject。</p>
<h5 id="3-简单的使用concat-first-来处理多来源"><a href="#3-简单的使用concat-first-来处理多来源" class="headerlink" title="3.简单的使用concat().first()来处理多来源"></a>3.简单的使用concat().first()来处理多来源</h5><p>Dan Lew在他的博客<a href="http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/" target="_blank" rel="external">Loading data from multiple sources with RxJava
</a>中介绍过他这种处理方法,</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div></pre></td><td class="code"><pre><div class="line">// Our sources (left as an exercise for the reader)</div><div class="line">Observable<Data> memory = ...; </div><div class="line">Observable<Data> disk = ...; </div><div class="line">Observable<Data> network = ...;</div><div class="line"></div><div class="line">// Retrieve the first source with data</div><div class="line">Observable<Data> source = Observable </div><div class="line"> .concat(memory, disk, network)</div><div class="line"> .first();</div></pre></td></tr></table></figure>
<p>然后在每次做不同请求的时候刷新缓存</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div></pre></td><td class="code"><pre><div class="line">Observable<Data> networkWithSave = network.doOnNext(data -> { </div><div class="line"> saveToDisk(data);</div><div class="line"> cacheInMemory(data);</div><div class="line">});</div><div class="line"></div><div class="line">Observable<Data> diskWithCache = disk.doOnNext(data -> { </div><div class="line"> cacheInMemory(data);</div><div class="line">});</div></pre></td></tr></table></figure>
<p>具体也可以看这篇简书,我也不在过多赘述 <a href="http://www.jianshu.com/p/d5c43a10250b" target="_blank" rel="external">:RxJava(八)concat符操作处理多数据源</a></p>
<h5 id="4-自己继承Observable-手动写subscribeActual-方法"><a href="#4-自己继承Observable-手动写subscribeActual-方法" class="headerlink" title="4.自己继承Observable 手动写subscribeActual()方法"></a>4.自己继承Observable 手动写subscribeActual()方法</h5><p>这可能是最灵活的写法?如果你想用RxJava封装自己的库,推荐这种方法封装。因为这样不仅仅可以有效的进行错误处理,并且不会暴露过多逻辑给外面,许多优秀的RxJava相关库都是这样封装,就连RxJava自己也是把一个个的operator封装成一个个不同的Observable。但是这种方法确实要求很高,要做很多考虑,比如异步,多线程冲突,错误处理。对新手不是很推荐。</p>
</div>
<footer class="article-footer">
<a data-url="http://yoursite.com/2017/04/09/rxjava-3/" data-id="cj2524lso000358w13txls6pz" class="article-share-link">Delen</a>
<ul class="article-tag-list"><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/Android/">Android</a></li><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/RxJava/">RxJava</a></li></ul>
</footer>
</div>
</article>
<article id="post-rxjava-2" class="article article-type-post" itemscope itemprop="blogPost">
<div class="article-meta">
<a href="/2017/04/09/rxjava-2/" class="article-date">
<time datetime="2017-04-09T16:32:37.000Z" itemprop="datePublished">2017-04-09</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="article-title" href="/2017/04/09/rxjava-2/">放弃RxBus,拥抱RxJava(二):Observable究竟如何封装数据?</a>
</h1>
</header>
<div class="article-entry" itemprop="articleBody">
<p>上篇简单讲到了一些关于<em>Event/Rx bus</em>的优缺点。并且提到了如何“正确”使用RxJava,而不是使用RxBus来自己重新发明轮子。</p>
<blockquote>
<p><a href="https://wbinarytree.github.io/2017/04/09/RxJava-1/" target="_blank" rel="external">放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus</a></p>
</blockquote>
<p>其中也讲到了一个简单使用 <em>create()</em> 方法来进行封装Observable。但也留下了许多坑,比如内存泄漏,不能Multicast(多个Subscriber订阅同一个Observable) 等问题。所以这篇,我们接着通过这个例子,来具体了解下,如何封装Observable。</p>
<h4 id="1-Observable提供的静态方法都做了什么?"><a href="#1-Observable提供的静态方法都做了什么?" class="headerlink" title="1. Observable提供的静态方法都做了什么?"></a>1. Observable提供的静态方法都做了什么?</h4><p>首先我们来简单看一下Observable的静态方法,just/from/create都怎么为你提供Observable。<br>我们先看just:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div></pre></td><td class="code"><pre><div class="line">public static <T> Observable<T> just(T item) {</div><div class="line"> ObjectHelper.requireNonNull(item, "The item is null");</div><div class="line"> return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>我们暂时不需要纠结 RxJavaPlugins.onAssembly() 这个方法。比较重要的是 just(T item) 方法会为你提供一个 ObservableJust<t>(item) 的实例,而这个 ObservableJust 类,就是一个RxJava内部的实现类。<br>在 RxJava 2.x 中 Observable 是一个抽象类,只有一个抽象方法,subscribeActual(Observer observer);(但是Observable的源码足足有13518行!!!)<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div></pre></td><td class="code"><pre><div class="line">public abstract class Observable<T> implements ObservableSource<T>{</div><div class="line"> //implemented methods</div><div class="line"></div><div class="line"> protected abstract void subscribeActual(Observer<? super T> observer);</div><div class="line"></div><div class="line"> //other implements/operators</div><div class="line">}</div></pre></td></tr></table></figure></t></p>
<p>那么ObservableJust这个类究竟什么样呢?<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div></pre></td><td class="code"><pre><div class="line">public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {</div><div class="line"></div><div class="line"> private final T value;</div><div class="line"> public ObservableJust(final T value) {</div><div class="line"> this.value = value;</div><div class="line"> }</div><div class="line"></div><div class="line"> @Override</div><div class="line"> protected void subscribeActual(Observer<? super T> s) {</div><div class="line"> ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);</div><div class="line"> s.onSubscribe(sd);</div><div class="line"> sd.run();</div><div class="line"> }</div><div class="line"></div><div class="line"> @Override</div><div class="line"> public T call() {</div><div class="line"> return value;</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>我们首先看到构造方法里,直接把value赋给了ObservableJust的成员。这也就是为什么Observable.just()里的代码会直接运行,而不是像create()方法,有Subscriber时候才能运行。<br>再来看看两个item的just(T item1,T item2):<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div></pre></td><td class="code"><pre><div class="line">public static <T> Observable<T> just(T item1, T item2) {</div><div class="line"> ObjectHelper.requireNonNull(item1, "The first item is null");</div><div class="line"> ObjectHelper.requireNonNull(item2, "The second item is null");</div><div class="line"></div><div class="line"> return fromArray(item1, item2);</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>诶?怎么画风突变?不是ObservableJust了?其实除了只有一个item的just,其他的just方法也都是调用了这个fromArray。那我们来看看这个fromArray:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div></pre></td><td class="code"><pre><div class="line">public static <T> Observable<T> fromArray(T... items) {</div><div class="line"> ObjectHelper.requireNonNull(items, "items is null");</div><div class="line"> if (items.length == 0) {</div><div class="line"> return empty();</div><div class="line"> } else</div><div class="line"> if (items.length == 1) {</div><div class="line"> return just(items[0]);</div><div class="line"> }</div><div class="line"> return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>前面一些check我们忽略,这里我们发现一些熟悉的身影了ObservableFromArray<t>(items)。又一个Observable的实现类。<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div></pre></td><td class="code"><pre><div class="line">public final class ObservableFromArray<T> extends Observable<T> {</div><div class="line"> final T[] array;</div><div class="line"> public ObservableFromArray(T[] array) {</div><div class="line"> this.array = array;</div><div class="line"> }</div><div class="line"> @Override</div><div class="line"> public void subscribeActual(Observer<? super T> s) {</div><div class="line"> FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);</div><div class="line"></div><div class="line"> s.onSubscribe(d);</div><div class="line"></div><div class="line"> if (d.fusionMode) {</div><div class="line"> return;</div><div class="line"> }</div><div class="line"></div><div class="line"> d.run();</div><div class="line"> }</div><div class="line"></div><div class="line"> static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {</div><div class="line"> //implements</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></t></p>
<p>是不是更熟悉?其实Observable几乎所有的静态方法都是这样,甚至包括一些著名的RxJava库比如RxBinding,也都是使用这种封装方法。内部实现Observable的subscribeActual()方法。对外只提供静态方法来为你生成Observable。为什么这么做,我们来了解一下subscribeActual()方法。</p>
<h4 id="2-subscribeActual-究竟是什么?"><a href="#2-subscribeActual-究竟是什么?" class="headerlink" title="2. subscribeActual() 究竟是什么?"></a>2. subscribeActual() 究竟是什么?</h4><p>subscribeActual()其实就是Observable和Observer沟通的桥梁。这个Observer(Subscriber)就是你在Observable.subscribe()方法里写的那个类,或者是Consumer(只处理onNext方法)。<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div></pre></td><td class="code"><pre><div class="line">public final void subscribe(Observer<? super T> observer) {</div><div class="line"> ObjectHelper.requireNonNull(observer, "observer is null");</div><div class="line"> try {</div><div class="line"> observer = RxJavaPlugins.onSubscribe(this, observer);</div><div class="line"></div><div class="line"> ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");</div><div class="line"></div><div class="line"> subscribeActual(observer);</div><div class="line"> } catch (NullPointerException e) { // NOPMD</div><div class="line"> } catch (Throwable e) {</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>我们看到其实这个方法除了Check和Apply就只有这一行subscribeActual(observer),连接了Observable和Observer。所以我们知道了,subscribeActual()方法里的代码,只有在subscribe()调用后,才回调用。</p>
<p>那么他们是如何链接的呢?其实很简单,根据你的逻辑一句一句的调用observer.onXX()方法就可以了。比如刚才我们看到的ObservableJust:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div></pre></td><td class="code"><pre><div class="line">@Override</div><div class="line">public void run() {</div><div class="line"> if (get() == START && compareAndSet(START, ON_NEXT)) {</div><div class="line"> observer.onNext(value);</div><div class="line"> if (get() == ON_NEXT) {</div><div class="line"> lazySet(ON_COMPLETE);</div><div class="line"> observer.onComplete();</div><div class="line"> }</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>再比如我们的ObservableFromArray:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div></pre></td><td class="code"><pre><div class="line">void run() {</div><div class="line"> T[] a = array;</div><div class="line"> int n = a.length;</div><div class="line"></div><div class="line"> for (int i = 0; i < n && !isDisposed(); i++) {</div><div class="line"> T value = a[i];</div><div class="line"> if (value == null) {</div><div class="line"> actual.onError(new NullPointerException("The " + i + "th element is null"));</div><div class="line"> return;</div><div class="line"> }</div><div class="line"> actual.onNext(value);</div><div class="line"> }</div><div class="line"> if (!isDisposed()) {</div><div class="line"> actual.onComplete();</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>复杂点的例子,比如如何封装button的OnClick事件:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div><div class="line">23</div><div class="line">24</div><div class="line">25</div><div class="line">26</div><div class="line">27</div><div class="line">28</div><div class="line">29</div></pre></td><td class="code"><pre><div class="line">@Override protected void subscribeActual(Observer<? super Object> observer) {</div><div class="line"> if (!checkMainThread(observer)) {</div><div class="line"> return;</div><div class="line"> }</div><div class="line"> Listener listener = new Listener(view, observer);</div><div class="line"> observer.onSubscribe(listener);</div><div class="line"> view.setOnClickListener(listener);</div><div class="line">}</div><div class="line"></div><div class="line">static final class Listener extends MainThreadDisposable implements OnClickListener {</div><div class="line"> private final View view;</div><div class="line"> private final Observer<? super Object> observer;</div><div class="line"></div><div class="line"> Listener(View view, Observer<? super Object> observer) {</div><div class="line"> this.view = view;</div><div class="line"> this.observer = observer;</div><div class="line"> }</div><div class="line"></div><div class="line"> @Override public void onClick(View v) {</div><div class="line"> if (!isDisposed()) {</div><div class="line"> observer.onNext(Notification.INSTANCE);</div><div class="line"> }</div><div class="line"> }</div><div class="line"></div><div class="line"> @Override protected void onDispose() {</div><div class="line"> view.setOnClickListener(null);</div><div class="line"> }</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>但是细心的同学应该看到了,每个subscribeActual()方法里,都会有 observer.onSubscribe(disposable)这句。那么这句又是做什么的呢?<br>Disposable其实就是控制你取消订阅的。他只有两个方法 dispose() 取消订阅,和 isDisposed() 来通知是否已经取消了订阅。<br>取消订阅时,要根据需求释放资源。在subscribeActual()里逻辑要严谨,比如onComplete()之后不要有onNext()。需要注意的点很多,所以可能这也就是为什么RxJava推荐用户使用静态方法生成Observable吧。</p>
<p>最后再说一下几点:</p>
<ul>
<li>create()方法:其实create()方法相当于你把subscribeActual中的代码,写到了create里而已。所以有很高的操控性。</li>
<li>Flowable:Floawble其实在实现上和Observable类似,区别是Observable同过 Disposable控制取消订阅。而Flowable同过Subscription。其中还需要request()方法控制流量。具体关于这个问题,我推荐这篇文章<blockquote>
<p><a href="http://www.jianshu.com/p/36e0f7f43a51" target="_blank" rel="external">给初学者的RxJava2.0教程</a></p>
</blockquote>
</li>
</ul>
<p>总结:</p>
<ul>
<li>我们从源码分析角度来说,RxJava 2.x 也是同过subscribeActual来链接Observable和Observer(Subscriber)。本质上和Listener没什么太大区别。但是,RxJava的确是诸多一线Java/Android开发者的结晶。丰富的操作符,线程调度等等诸多优势。而且保证类型安全。<br>如果你还是停留在仅仅使用RxJava来实现一个RxBus,是不是有点杀鸡用牛刀呢?</li>
</ul>
</div>
<footer class="article-footer">
<a data-url="http://yoursite.com/2017/04/09/rxjava-2/" data-id="cj2524ltk000c58w1wb56yqa5" class="article-share-link">Delen</a>
<ul class="article-tag-list"><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/Android/">Android</a></li><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/RxJava/">RxJava</a></li></ul>
</footer>
</div>
</article>
<article id="post-RxJava-1" class="article article-type-post" itemscope itemprop="blogPost">
<div class="article-meta">
<a href="/2017/04/09/RxJava-1/" class="article-date">
<time datetime="2017-04-09T16:25:56.000Z" itemprop="datePublished">2017-04-09</time>
</a>
</div>
<div class="article-inner">
<header class="article-header">
<h1 itemprop="name">
<a class="article-title" href="/2017/04/09/RxJava-1/">放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus</a>
</h1>
</header>
<div class="article-entry" itemprop="articleBody">
<p>EventBus和Otto在之前作为Android组件间通信工具,简单方便十分受欢迎,但是也非常容易Abuse。大概有如下几个缺点:</p>
<ul>
<li>由于是Event,在发布Event的时候就要做好准备可能并没有人接受这个Event, Subscribe的时候也要做好准备可能永远不会收到Event。Event无论顺序还是时间上都某种程度上不太可控。如果你将数据寄托在Event上然后就直接在Android其他生命周期方法中直接使用这个数据或成员变量。那么很有可能你会得到NPE。</li>
<li>EventBus看似将你的程序解耦,但是又有些过了。我们常常使用EventBus传数据,这已经是Dependency级别的数据而不是一个可以被解耦出来的模块。这样就造成了过多EventBus的代码会造成代码结构混乱,难以测试和追踪,违背了解耦的初衷。这时如果有意或无意的造成了Nested Event。那情况会更糟。</li>
</ul>
<p>由于EventBus的种种缺点,以及后面RxJava的出现。很多人都开始使用RxJava来取代EventBus。甚至Otto的官方介绍里都写到:</p>
<blockquote>
<h2 id="Deprecated"><a href="#Deprecated" class="headerlink" title="Deprecated!"></a>Deprecated!</h2><p>This project is deprecated in favor of <a href="https://github.com/ReactiveX/RxJava" target="_blank" rel="external">RxJava</a> and<br><a href="https://github.com/ReactiveX/RxAndroid" target="_blank" rel="external">RxAndroid</a>. These projects permit the same event-driven<br>programming model as Otto, but they’re more capable and offer better control of threading.</p>
<p>If you’re looking for guidance on migrating from Otto to Rx, <a href="http://blog.kaush.co/2014/12/24/implementing-an-event-bus-with-rxjava-rxbus/" target="_blank" rel="external">this post</a><br>is a good start.</p>
</blockquote>
<p>链接是一个教你怎么使用RxJava来自己手动写一个RxBus来代替EventBus的文章。虽然看起来是在用RxJava。但是实际上却仍然在用EventBus。甚至这个封装其实也并没有GreenRobot或者Otto来的好。<br>我们看看Jake Wharton对RxBus的评价:<br><img src="http://upload-images.jianshu.io/upload_images/2417399-ef00941c1cfd7ace.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""><br><em>我想”RxBus”唯一的好处就是他是一个Rx的入门毒品。否则的话,你要么就不是在用Rx,要么你需要更加惯用的Rx资源</em> (渣翻译见谅)</p>
<p>再来一个GitHub的:<br><img src="http://upload-images.jianshu.io/upload_images/2417399-7608f7d805c23783.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""></p>
<p>subscribeActual部分我们先不考虑。然而Jake指出最好不要使用Relay来“重新发明”Event Bus.</p>
<p>这里看图说话:<br>Jake Wharton在GOTO 2016 上的讲座中提到,我们正常的Android编程是这样的:<br><img src="http://upload-images.jianshu.io/upload_images/2417399-5612eee4ee9a61d2.PNG?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""><br>我们像一个中间人一样。<br>而使用RxJava。 我们的结构,更像这样<br><img src="http://upload-images.jianshu.io/upload_images/2417399-00cf6db34e8b4c84.PNG?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""><br>我们使用RxJava来直接把组件相连,对所接受到的数据作出反应,所谓的 “Reactive”。<br>而使用Eventbus? Jake 没说, 我自己画一个:</p>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-6da08e095b8d0899.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""><br>我们作为一个中间人,传递消息。EventBus作为另一个中间人。帮我们传递消息。(这也就是所谓的“看似解耦”)</p>
<p>再打个比方,虽然我们将EventBus翻译成时间总线,但是其实总线就是Bus也就是公交车。而RxJava更像一个专车,Uber或者滴滴。他直接链接你的两个或多个需要通信的类。传输数据,当然你可以做一个很大的专车,穿梭在所有类之间,也就是所谓的RxBus。所以在这里为什么放弃RxBus也就不言而喻了不是?</p>
<p>那么,问题来了?</p>
<h4 id="怎样才是正确(正常?)的RxJava使用方式?"><a href="#怎样才是正确(正常?)的RxJava使用方式?" class="headerlink" title="怎样才是正确(正常?)的RxJava使用方式?"></a>怎样才是正确(正常?)的RxJava使用方式?</h4><p>其实Jake 也在GitHub的讨论上给出了一个答案:<br><img src="http://upload-images.jianshu.io/upload_images/2417399-379766ad920c95ee.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""></p>
<p>所以应该是,每当你想发布一个Event在EventBus时,直接暴露一个Observable出来。每当你想接受一个Event时,找到这个Observable并且Subscribe他。</p>
<p>这样做的好处是什么?</p>
<ul>
<li>目标和地点都很明确。你的Subscriber明确的知道他Subscribe的是谁,而且明确的知道我需要作出什么反应。这也正是RxJava的核心“响应式编程”。</li>
<li>由于使用了Observable,对于异常处理将会非常方便。而且还有功能强大全面的Operator来辅助你。</li>
<li>虽然看起来耦合性有所增加。但是这是必要的,上面也说过,EventBus虽然在代码上看似解耦。其实他们还是联系在一起的。而我们这样直接暴露Observable给需要的其他类,这完成了1 -> 1/N的链接,而不需要EventBus这个中间人来传递消息/事件,而且保证我们需要的事件一定会直接到达。</li>
</ul>
<p>####我们来举个例子</p>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-810562495275f259.PNG?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" alt=""></p>
<p>上下两个Fragment,上面的一个EditText,下面的一个TextView。上面的EditText变化的时候下面的TextView也跟着变化。</p>
<p>先把EditText的TextChangedListener封装在Observable里:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div><div class="line">9</div><div class="line">10</div><div class="line">11</div><div class="line">12</div><div class="line">13</div><div class="line">14</div><div class="line">15</div><div class="line">16</div><div class="line">17</div><div class="line">18</div><div class="line">19</div><div class="line">20</div><div class="line">21</div><div class="line">22</div><div class="line">23</div><div class="line">24</div></pre></td><td class="code"><pre><div class="line"> stringObservable = Observable.create(e -> editText.addTextChangedListener(new TextWatcher() {</div><div class="line"> @Override</div><div class="line"> public void beforeTextChanged(CharSequence s, int start, int count, int after) {</div><div class="line"></div><div class="line"> }</div><div class="line"></div><div class="line"> @Override</div><div class="line"> public void onTextChanged(CharSequence s, int start, int before, int count) {</div><div class="line"> e.onNext(s.toString());</div><div class="line"> }</div><div class="line"></div><div class="line"> @Override</div><div class="line"> public void afterTextChanged(Editable s) {</div><div class="line"></div><div class="line"> }</div><div class="line"> }));</div><div class="line"></div><div class="line">/**</div><div class="line">***</div><div class="line">*/</div><div class="line"> //Expose Observable</div><div class="line"> public Observable<String> getEditTextObservable() {</div><div class="line"> return stringObservable;</div><div class="line"> }</div></pre></td></tr></table></figure></p>
<p>不习惯自己封装可以使用RxBinding :<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div></pre></td><td class="code"><pre><div class="line">stringObservable = RxTextView.textChangeEvents(editText)</div><div class="line"> .map(event -> event.text().toString());</div></pre></td></tr></table></figure></p>
<p>再从我们的TextViewFragment中 取到这个封装好的Observable:<br><figure class="highlight plain"><table><tr><td class="gutter"><pre><div class="line">1</div><div class="line">2</div><div class="line">3</div><div class="line">4</div><div class="line">5</div><div class="line">6</div><div class="line">7</div><div class="line">8</div></pre></td><td class="code"><pre><div class="line">@Override</div><div class="line">public void onStart() {</div><div class="line"> super.onStart();</div><div class="line"> FragmentEditText fragment = (FragmentEditText) getFragmentManager().findFragmentByTag(FragmentEditText.TAG);</div><div class="line"> if(fragment != null){</div><div class="line"> fragment.getStringObservable().subscribe(s -> textView.setText(s));</div><div class="line"> }</div><div class="line">}</div></pre></td></tr></table></figure></p>
<p>来看看效果:</p>
<p><img src="http://upload-images.jianshu.io/upload_images/2417399-f008642da01d310f.gif?imageMogr2/auto-orient/strip" alt=""></p>
<p>当然,这里还有个问题</p>
<ul>
<li>由于我们将editText封装在Observable里,无论是create()方法还是使用RxBinding,都会持有这个View的强引用。造成内存泄漏。所以我们一定要在最后加入dispose()方法来释放。所以我推荐使用RxBinding,他已经帮我们在dispose()方法里写好了解除Listener的方法。</li>
<li>因为并没有使用publish操作符,导致多个Subscriber的时候还是有些许问题。可以考虑直接加入.share().</li>
</ul>
<p>具体我们如何讲常用的数据/Callback封装到Observable中。我会在接下来的文章中写到。</p>
<blockquote>
<p> <a href="https://wbinarytree.github.io/2017/04/09/rxjava-2/" target="_blank" rel="external">第二篇链接</a></p>
</blockquote>
</div>
<footer class="article-footer">
<a data-url="http://yoursite.com/2017/04/09/RxJava-1/" data-id="cj2524lsl000258w104rp8jfh" class="article-share-link">Delen</a>
<ul class="article-tag-list"><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/Android/">Android</a></li><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/RxJava/">RxJava</a></li></ul>
</footer>
</div>
</article>
</section>
<aside id="sidebar">
<div class="widget-wrap">
<h3 class="widget-title">Labels</h3>
<div class="widget">
<ul class="tag-list"><li class="tag-list-item"><a class="tag-list-link" href="/tags/Android/">Android</a></li><li class="tag-list-item"><a class="tag-list-link" href="/tags/RxJava/">RxJava</a></li></ul>
</div>
</div>
<div class="widget-wrap">
<h3 class="widget-title">Tag Cloud</h3>
<div class="widget tagcloud">
<a href="/tags/Android/" style="font-size: 10px;">Android</a> <a href="/tags/RxJava/" style="font-size: 10px;">RxJava</a>
</div>
</div>
<div class="widget-wrap">
<h3 class="widget-title">Archieven</h3>
<div class="widget">
<ul class="archive-list"><li class="archive-list-item"><a class="archive-list-link" href="/archives/2017/04/">April 2017</a></li></ul>
</div>
</div>
<div class="widget-wrap">
<h3 class="widget-title">Recente berichten</h3>
<div class="widget">
<ul>
<li>
<a href="/2017/04/30/rxjava5/">拥抱RxJava(番外篇):关于RxJava的Tips & Tricks</a>
</li>
<li>
<a href="/2017/04/23/rxjava4/">拥抱RxJava(四):动手做一个Full Rx的 注册界面</a>
</li>
<li>
<a href="/2017/04/09/rxjava-3/">拥抱RxJava(三):关于Observable的冷热,常见的封装方式以及误区</a>
</li>
<li>
<a href="/2017/04/09/rxjava-2/">放弃RxBus,拥抱RxJava(二):Observable究竟如何封装数据?</a>
</li>
<li>
<a href="/2017/04/09/RxJava-1/">放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus</a>
</li>
</ul>
</div>
</div>
</aside>
</div>
<footer id="footer">
<div class="outer">
<div id="footer-info" class="inner">
© 2017 BinaryTree WANG<br>
Powered by <a href="http://hexo.io/" target="_blank">Hexo</a>
</div>
</div>
</footer>
</div>
<nav id="mobile-nav">
<a href="/" class="mobile-nav-link">Home</a>
<a href="/archives" class="mobile-nav-link">Archives</a>
</nav>
<script src="//ajax.googleapis.com/ajax/libs/jquery/2.0.3/jquery.min.js"></script>
<link rel="stylesheet" href="/fancybox/jquery.fancybox.css">
<script src="/fancybox/jquery.fancybox.pack.js"></script>
<script src="/js/script.js"></script>
</div>
</body>
</html>