聊一聊 Java-协程 那些事 [复制链接]

2019-1-10 18:53
Mob开发者平台 阅读:629 评论:0 赞:1
Tag:  

什么是协程

大多数的开发人员可能对进程,线程这两个名字比较熟悉。但是为了追求最大力度的发挥硬件的性能和提升软件的速度,出现了协程或者叫纤程(Fiber),或者绿色线程(GreenThread)。那我们来聊下什么是协程,以及在java中是怎么体现和运用协程的。

在说协程之前,我们先来回想下,现在大多数的程序中,都是使用了多线程技术来解决一些需要长时间阻塞的场景。JAVA中每个线程栈默认1024K,没有办法开成千上万个线程,而且就算通过JVM参数调小,CPU也无法分配时间片给每个线程,大多数的线程还是在等待中,所以我们一般会使用

Runtime.getRuntime().availableProcessors()来配置线程数的大小(或者会根据实际情况调整,就不展开讨论了),但是就算是我们开了新的线程,该线程也可能是在等待系统IO的返回或者网络IO的返回,而且线程的切换有着大量的开销。

为了解决上面说的问题,大家可能会想到回调。现在很多框架都是基于回调来解决那些耗时的操作。但层数嵌套多了反而会引起反人类的回调地狱,并且回调后就丢失原函数的上下文。其中的代表呢就比如说nodeJs。

终于可以来聊聊协程。它的基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,在还原原来的栈信息并继续执行。上面提到的几个点大家会想到JVM的结构,栈, 程序计数器等等,但是JVM原生是不支持这样的操作的(至少java是不支持的,kotlin是可以的)。因此如果要在纯java代码里需要使用协程的话需要引入第三方包,如kilim,Quasar。而kilim已经很久未更新了,那么我们来看看Quasar。

Quasar原理

1、利用字节码增强,将普通的java代码转换为支持协程的代码。
2、在调用pausable方法的时候,如果pause了就保存当前方法栈的State,停止执行当前协程,将控制权交给调度器
3、调度器负责调度就绪的协程
4、协程resume的时候,自动恢复State,根据协程的pc计数跳转到上次执行的位置,继续执行。

这些第三方的框架大部分实现是一致的。通过对字节码直接操作,在编译期把你写的代码变为支持协程的版本,并在运行时把你所有需要用到协程的部分由他来控制和调度,同时也支持在运行期这样做。
Quasar中使用了抛异常的方式来中断线程,但是 实际上如果我们捕获了这个异常就会产生问题,所以一般是以这种方式来注册:

@Suspendable
public int f() {
 try {
   // do some stuff
   return g() * 2;
 } catch(SuspendExecution s) {
   //这里不应该捕获到异常.
   throw new AssertionError(s);
 }
}

在调度方面,Quasar中默认使用了JDK7以上才有的ForkJoinPool,它的优势就在于空闲的线程会去从其他线程任务队列尾部”偷取”任务来自己处理,因此也叫work-stealing功能。这个功能可以大大的利用CPU资源,不让线程白白空闲着。

Quasar模块

Fiber

Fiber可以认为是一个微线程,使用方式基本上和Thread相同,启动start:

new Fiber<V>() {

@Override
protected V run() throws SuspendExecution, InterruptedException {
       // your code
   }
}.start();

new Fiber<Void>(new SuspendableRunnable() {

public void run() throws SuspendExecution, InterruptedException {
   // your code
 }
}).start();  

其实它更类似于一个CallBack,是可以携带返回值的,并且可以抛异常SuspendExecution,InterruptedException。你也可以向其中传递SuspendableRunnable 或 SuspendableCallable 给Fiber的构造函数。你甚至可以像线程一样调用join(),或者get()来阻塞线程等待他完成。
当Fiber比较大的时候,Fiber可以在调用parkAndSerialize 方法时被序列化,在调用unparkSerialized时被反序列化。

从以上我们可以看出Fiber与Thread非常类似,极大的减少了迁移的成本。

FiberScheduler

FiberScheduler是Quasar框架中核心的任务调度器,负责管理任务的工作者线程WorkerThread,之前提到的他是一个FiberForkJoinScheduler。
ForkJoinPool的默认初始化个数为Runtime.getRuntime().availableProcessors()。

instrumentation

当一个类被加载时,Quasar的instrumentation模块 (使用 Java agent时) 搜索suspendable 方法。每一个suspendable 方法 f通过下面的方式 instrument:

它搜索对其它suspendable方法的调用。对suspendable方法g的调用,一些代码会在这个调用g的前后被插入,它们会保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,我们会发现对Fiber.park的调用。park暂停这个fiber,扔出 SuspendExecution异常。

当g block的时候,SuspendExecution异常会被Fiber捕获。 当Fiber被唤醒(使用unpark), 方法f会被调用, 执行记录显示它被block在g的调用上,所以程序会立即跳到f调用g的那一行,然后调用它。最终我们会到达暂停点,然后继续执行。当g返回时, f中插入的代码会恢复f的本地变量。

过程听起来很复杂,但是它只会带来3% ~ 5%的性能的损失。

下面看一个简单的例子, 方法m2声明抛出SuspendExecution异常,方法m1调用m2和m3,所以也声明抛出这个异常,最后这个异常会被Fiber所捕获:

  public class Helloworld {
   static void m1() throws SuspendExecution, InterruptedException {
       String m = "m1";
       System.out.println("m1 begin");
       m = m2();
       m = m3();
       System.out.println("m1 end");
       System.out.println(m);
   }
   static String m2() throws SuspendExecution, InterruptedException {
       return "m2";
   }
   static String m3() throws SuspendExecution, InterruptedException {
       return "m3";
   }
   static public void main(String[] args) throws ExecutionException, InterruptedException {
       new Fiber<Void>("Caller", new SuspendableRunnable() {
           @Override
           public void run() throws SuspendExecution, InterruptedException {
               m1();
           }
       }).start();
   }
}

// 反编译后的代码
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
  throws SuspendExecution, InterruptedException
{
  // Byte code:
  //   0: aconst_null
  //   1: astore_3
  //   2: invokestatic 88  co/paralleluniverse/fibers/Stack:getStack   ()Lco/paralleluniverse/fibers/Stack;
  //   5: dup
  //   6: astore_1
  //   7: ifnull +42 -> 49
  //   10: aload_1
  //   11: iconst_1
  //   12: istore_2
  //   13: invokevirtual 92    co/paralleluniverse/fibers/Stack:nextMethodEntry    ()I
  //   16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
  //   40: aload_1
  //   41: invokevirtual 96    co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
  //   44: ifne +5 -> 49
  //   47: aconst_null
  //   48: astore_1
  //   49: iconst_0
  //   50: istore_2
  //   51: ldc 2
  //   53: astore_0
  //   54: getstatic 3 java/lang/System:out    Ljava/io/PrintStream;
  //   57: ldc 4
  //   59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
  //   62: aload_1
  //   63: ifnull +26 -> 89
  //   66: aload_1
  //   67: iconst_1
  //   68: iconst_1
  //   69: invokevirtual 100   co/paralleluniverse/fibers/Stack:pushMethod (II)V
  //   72: aload_0
  //   73: aload_1
  //   74: iconst_0
  //   75: invokestatic 104    co/paralleluniverse/fibers/Stack:push   (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
  //   78: iconst_0
  //   79: istore_2
  //   80: aload_1
  //   81: iconst_0
  //   82: invokevirtual 108   co/paralleluniverse/fibers/Stack:getObject  (I)Ljava/lang/Object;
  //   85: checkcast 110   java/lang/String
  //   88: astore_0
  //   89: invokestatic 6  com/colobu/fiber/Helloworld:m2  ()Ljava/lang/String;
  //   92: astore_0
  //   93: aload_1
  //   94: ifnull +26 -> 120
  //   97: aload_1
  //   98: iconst_2
  //   99: iconst_1
  //   100: invokevirtual 100  co/paralleluniverse/fibers/Stack:pushMethod (II)V
  //   103: aload_0
  //   104: aload_1
  //   105: iconst_0
  //   106: invokestatic 104   co/paralleluniverse/fibers/Stack:push   (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
  //   109: iconst_0
  //   110: istore_2
  //   111: aload_1
  //   112: iconst_0
  //   113: invokevirtual 108  co/paralleluniverse/fibers/Stack:getObject  (I)Ljava/lang/Object;
  //   116: checkcast 110  java/lang/String
  //   119: astore_0
  //   120: invokestatic 7 com/colobu/fiber/Helloworld:m3  ()Ljava/lang/String;
  //   123: astore_0
  //   124: getstatic 3    java/lang/System:out    Ljava/io/PrintStream;
  //   127: ldc 8
  //   129: invokevirtual 5    java/io/PrintStream:println (Ljava/lang/String;)V
  //   132: getstatic 3    java/lang/System:out    Ljava/io/PrintStream;
  //   135: aload_0
  //   136: invokevirtual 5    java/io/PrintStream:println (Ljava/lang/String;)V
  //   139: aload_1
  //   140: ifnull +7 -> 147
  //   143: aload_1
  //   144: invokevirtual 113  co/paralleluniverse/fibers/Stack:popMethod  ()V
  //   147: return
  //   148: aload_1
  //   149: ifnull +7 -> 156
  //   152: aload_1
  //   153: invokevirtual 113  co/paralleluniverse/fibers/Stack:popMethod  ()V
  //   156: athrow
  // Line number table:
  //   Java source line #13    -> byte code offset #51
  //   Java source line #15    -> byte code offset #54
  //   Java source line #16    -> byte code offset #62
  //   Java source line #17    -> byte code offset #93
  //   Java source line #18    -> byte code offset #124
  //   Java source line #19    -> byte code offset #132
  //   Java source line #21    -> byte code offset #139
  // Local variable table:
  //   start   length  slot    name    signature
  //   53  83  0   m   String
  //   6   147 1   localStack  co.paralleluniverse.fibers.Stack
  //   12  99  2   i   int
  //   1   1   3   localObject Object
  //   156 1   4   localSuspendExecution   SuspendExecution
  // Exception table:
  //   from    to  target  type
  //   49  148 148 finally
  //   49  148 156 co/paralleluniverse/fibers/SuspendExecution
  //   49  148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}    

我并没有更深入的去了解Quasar的实现细节以及调度算法,有兴趣的读者可以翻翻它的代码。

实战

<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
public class Helloworld {

   @Suspendable
   static void m1() throws InterruptedException, SuspendExecution {
       String m = "m1";
       //System.out.println("m1 begin");
       m = m2();
       //System.out.println("m1 end");
       //System.out.println(m);
   }
   static String m2() throws SuspendExecution, InterruptedException {
       String m = m3();
       Strand.sleep(1000);
       return m;
   }
   //or define in META-INF/suspendables
   @Suspendable
   static String m3() {
       List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
       return l.toString();
   }
   static public void main(String[] args) throws ExecutionException, InterruptedException {
       int count = 10000;
       testThreadpool(count);
       testFiber(count);
   }
   static void testThreadpool(int count) throws InterruptedException {
       final CountDownLatch latch = new CountDownLatch(count);
       ExecutorService es = Executors.newFixedThreadPool(200);
       LongAdder latency = new LongAdder();
       long t = System.currentTimeMillis();
       for (int i =0; i< count; i++) {
           es.submit(() -> {
               long start = System.currentTimeMillis();
               try {
                   m1();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               } catch (SuspendExecution suspendExecution) {
                   suspendExecution.printStackTrace();
               }
               start = System.currentTimeMillis() - start;
               latency.add(start);
               latch.countDown();
           });
       }
       latch.await();
       t = System.currentTimeMillis() - t;
       long l = latency.longValue() / count;
       System.out.println("thread pool took: " + t + ", latency: " + l + " ms");
       es.shutdownNow();
   }
   static void testFiber(int count) throws InterruptedException {
       final CountDownLatch latch = new CountDownLatch(count);
       LongAdder latency = new LongAdder();
       long t = System.currentTimeMillis();
       for (int i =0; i< count; i++) {
           new Fiber<Void>("Caller", new SuspendableRunnable() {
               @Override
               public void run() throws SuspendExecution, InterruptedException {
                   long start = System.currentTimeMillis();
                   m1();
                   start = System.currentTimeMillis() - start;
                   latency.add(start);
                   latch.countDown();
               }
           }).start();
       }
       latch.await();
       t = System.currentTimeMillis() - t;
       long l = latency.longValue() / count;
       System.out.println("fiber took: " + t  + ", latency: " + l + " ms");
   }
}
 

OUTPUT:

1 thread pool took: 50341, latency: 1005 ms
2 fiber took: 1158, latency: 1000 ms

可以看到很明显的时间差距,存在多线程阻塞的情况下,协程的性能非常的好,但是。如果把sleep这段去掉,Fiber的性能反而更差:

这说明Fiber并不意味着它可以在所有的场景中都可以替换Thread。当fiber的代码经常会被等待其它fiber阻塞的时候,就应该使用fiber。

对于那些需要CPU长时间计算的代码,很少遇到阻塞的时候,就应该首选thread

扩展

其实协程这个概念在其他的语言中有原生的支持,如:
kotlin 1.30之后已经稳定:
https://www.kotlincn.net/docs/reference/coroutines-overview.html
golang:
https://gobyexample.com/goroutines
python:
http://www.gevent.org/contents.html
等等~
在这些语言中协程就看起来至少没这么奇怪或者难以理解了,而且开发起开也相比java简单很多。

总结

协程的概念也不算是很新了,但是在像Java这样的语言或者特定的领域并不是很火,也并没有完全普及。不是很明白是它的学习成本高,还是说应用场景是在太小了。但是当我听到这个概念的时候确实是挺好奇,也挺好奇的。也希望之后会有更多的框架和特性来简化我们苦逼程序员的开发~~

分享到:
我来说两句
facelist
您需要登录后才可以评论 登录 | 立即注册
所有评论(0)

领先的中文移动开发者社区
18620764416
7*24全天服务
意见反馈:1294855032@qq.com

扫一扫关注我们

Powered by Discuz! X3.2© 2001-2019 Comsenz Inc.( 粤ICP备15117877号 )