剑客
关注科技互联网

Akka源码分析(一)

1. 基础知识

1.1 ForkJoinPool

ForkJoinPool是Java 1.7之后新添加的一个ExecutorService实现,在java.util.concurrent中。和其他的ExecutorService一样,ForkJoinPool在提供自身特殊优势的同时也可以作为一个普通的Executor框架来使用,通过submit等方法来提交Runnable任务。

下面简要介绍一下ForkJoinPool的特殊之处和一些简要用法。作为Doug Lea老爷子的又一力作,ForkJoinPool可以说又为Java Concurrent添加了一元悍将。ForkJoinPool最大的特殊之处就在于其实现了工作密取( work-stealing )。简单的说,ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

下面简要讲解实现要点,具体可参考Doug Lea老先生的论文: A Java Fork/Join Framework 。ForkJoinPool中的工作线程是由ForkJoinWorkerThread类实现的,其通过维护一个双端队列(ForkJoinPool.WorkQueue)来存放Task的,这里的Task一般是ForkJoinTask的子类。每一个工作线程简单的通过以下两条原则进行活动:

  • 如果队列非空,则代表自己线程的Task还没执行完毕,取出Task并执行
  • 如果队列为空,则随机选取一个其他的工作线程的Task并执行

那么为了减少在对Task双端队列进行操作时的Race Condition,这里的双端队列通过维护一个top变量和一个base变量来解决这个问题。top变量类似于栈帧,当ForkJoinTask fork出新的Task或者Client从外部提交一个新的Task的ForkJoinPool时,工作线程将Task以LIFO的方式push到双端队列的队头,top维护队头的位置,可以简单理解为双端队列push的部分为一个栈。而base维护队列的队尾,当别的线程需要从本工作线程密取任务时,是从双端队列的队尾出取出任务。工作队列基于以下几个保证对队列进行操作:

  • push和pop操作只会被owner线程调用
  • 只有非owner线程会调用take操作
  • pop和take操作只有在队列将要变成空(当前只有一个元素)时才会需要处理同步问题

也就是说这个实现的双端队列将整体的同步问题转换为了一个two-party的同步问题,对于take而言我们只要提供一个简单的entry lock来保证所以其他线程的take的一致性,而对于自己owner线程的pop和push几乎不需要同步。

由于ForkJoinPool的这些特性,因此它除了适合用来实现分而治之的计算框架以外,还非常适合用来作为基于event的异步消息处理执行框架,而Akka正是将ForkJoinPool作为默认的底层ExcutorService。事实证明,ForkJoinPool在Akka这种基于消息传递的异步执行环境下能够展现出非常高的性能优势,前提是尽量减少在处理过程中的线程阻塞(如IO等待等等)。

2. Akka源码分析

2.1 从一个示例开始

classActorTestextendsActor{
vallog = Logging(context.system,this)

overridedefreceive = {
case"test"=> log.info("received test");
case_ => log.info("unknown info")
 }
}

objectActorTest{
defprops(): Props = Props[ActorTest]
}

objectTest1extendsApp{
valsystem = ActorSystem("testSystem")
valactorTest = system.actorOf(ActorTest.props(), name ="actorTest")
 actorTest ! "test"
}

上面这一段代码建立了一个测试Actor,main方法创建ActorSystem,由此获得测试Actor的ActorRef,并且向其发送了一条内容为”test”消息。如果我们运行这段程序的话将会有下面的输出:

[INFO] [12/15/2014 21:36:24.524] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/actorTest] received test

代表ActorTest收到了消息,向log输出了”received test”,这是一个非常简单的例子,下面就将从这个例子开始来分析源码的调用路径以及各个类之间的协作过程。

2.2 Akka中几个主要对象之间的协作

2.2.1 ActorSystem

根据上面的例子,下面依次简要介绍akka中几个主要的class的作用以及依赖关系。

首先,ActorSystem是一组共享同一个配置的一系列actor的管理者和外部接口,一般我们从同一个JVM中只会创建一个ActorSystem并通过这个ActorSystem来进一步去构建各种actor,从而使系统运行起来。在ActorSystem中包含一系列的actor以及系统相关的属性,例如actorPath、scheduler、默认的dispacher等等,除此之外还有一系列的接口。ActorSystem是一个抽象类,其具体实现类为ActorSystemImpl。当我们调用ActorSystem(“testSystem”)时,实际调用了其伴生对象的apply方法,该方法获取相应的classLoader和settings之后,初始化ActorSystemImpl并返回ActorSystem引用

ActorSystemImpl在初始化时主要做了以下几件事:

  • 根据传入的settings反射构造Scheduler
  • 根据settings构造ActorRefProvider(这里具体实例为LocalActorRefProvider,所有的Actor创建最终都delegate给这个类,下面具体讲解)
  • 初始化Mailbox和Dispatcher的工厂类即Mailboxs和Dispatchers
  • 初始化默认的dispacher(即Dispatcher类)
  • 调用start方法做一些其他的初始化工作,准备好ActorSystem提供给Client使用

下面详细介绍ActorSystemImpl中持有的几个重要的引用

2.2.2 ActorRef

此类是一个不可变的代表实际Actor的句柄,在Client端我们对于所有Actor的直接访问都是通过ActorRef来进行的,这里提供一系列Client端可用的操作Actor的方式,比如发送消息等等。ActorRef类是一个抽象类,在上面这个简单的示例中,具体的实现类为LocalActorRef。

2.2.3 ActorCell

在ActorRef中,其只提供相应对于Actor操作的一层封装,而具体的实现是delegate给ActorCell来做的。ActorCell是Akka中的内部API,被标记为 private[akka] ,我们从外部无法访问到这个类。总的来说,大部分具体的Actor的行为都是delegate到这个内部类来做的。可以认为每一个ActorRef持有一个ActorCell。

2.2.4 ActorRefProvider

顾名思义,这个类提供一系列的对ActorRef的构建及相关的操作。其中的 actorOf 方法是所有Client构造ActorRef或者在Actor中构造新的ActorRef时最终会调用的方法,也就是说所有的其他的地方的构造actor的调用最终会delegate到这个类来做。除此之外,ActorRefProvider还持有一系列的系统级的对象,如guardian actor、system guardian actor、dead letters actor、settings等等。ActorSystem以及ActorContext都持有ActorRefProvider相同的引用,ActorSystem以及ActorContext初始化时会先初始化ActorRefProvider,然后再用其相关的变量初始化自身的一些参数。我们注意到ActorRefProvider是一个 trait 。在示例中具体的实现类为LocalActorRefProvider

2.2.5 MessageDispatcher

该类以及其相关的子类根据不同的配置情况实现了消息分发以及处理的主要逻辑。以示例中的场景为例,这里在系统中Akka共用同一个MessageDispatcher对象。然而在表现形式上,ActorSystem持有一个默认的MessageDispatcher,每一个ActorCell持有相应的MessageDispatcher的引用。现在先记住,所有的在Client端的消息发送的操作,最终都会delegate到相应的ActorRef对应的ActorCell的MessageDispatcher上,并且最终调用MessageDispatcher的dispatch方法。这里给出MessageDispatcher默认的实现方法的一段代码,如下:

protected[akka]defdispatch(receiver: ActorCell, invocation: Envelope): Unit = {
valmbox = receiver.mailbox//获取receiver对应的mailbox
 mbox.enqueue(receiver.self, invocation) //将消息入队mailbox
 registerForExecution(mbox, true,false)//将Task提交到底层的ForkJoinPool中执行
 }

代码已经非常清晰,在这里就不做过多解释。

2.2.6 Mailbox

在akka中每一个ActorCell在被初始化时都会初始化一个具体的Mailbox供自己使用,即每一个ActorCell都和自己所独占的某一个Mailbox一一对应,这样一来我们便可以通过各个Actor自己对应的Mailbox来收发邮件,而不会影响到其他的Actor。具体的代码请见下一小节。

2.3 关键源码解析

2.3.1 ActorSystem初始化

还是按照2.1节所提供的简单的例子,首先我们从ActorSystem的初始化开始。talk is cheap, show me the code!以下为经过简化后只保留核心部分的ActorSystem初始化的相关代码:

objectActorSystem{
defapply(name: String): ActorSystem = apply(name, None, None, None)

defapply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = {
valcl = classLoader.getOrElse(findClassLoader())
valappConfig = config.getOrElse(ConfigFactory.load(cl))
newActorSystemImpl(name, appConfig, cl, defaultExecutionContext).start()
 }
}

首先让我们看到这一段代码,这就是在2.1节中对ActorSystem的构造的入口 apply 方法。这一段代码比较容易理解,apply方法分别获取了classLoader和对应的akka的config配置,然后调用ActorSystemImpl的构造方法构造ActorSystem,最后调用start方法做postinit。

接下来让我们进入到ActorSystemImpl的构造方法,注意由于源码比较复杂,所以这里代码为经过简化后的代码,如下:

private[akka]classActorSystemImpl(val name: String, classLoader: ClassLoader, applicationConfig: Config){
finalvalsettings: Settings =newSettings(classLoader, applicationConfig, name)

valeventStream =newEventStream(this, DebugEventStream)

protecteddefsystemImpl: ActorSystemImpl =this

valscheduler: Scheduler = createScheduler()

defdynamicAccess: DynamicAccess = createDynamicAccess()
valprovider: ActorRefProvider = dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get

valmailboxes: Mailboxes =newMailboxes(settings)
valdispatchers: Dispatchers =newDispatchers(settings)
valdispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

defguardian: LocalActorRef = provider.guardian
defsystemGuardian: LocalActorRef = provider.systemGuardian

privatelazyval_start:this.type=try{
 registerOnTermination(stopScheduler())
// the provider is expected to start default loggers, LocalActorRefProvider does this
 provider.init(this)
if(settings.LogDeadLetters >0)
 logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
 eventStream.startUnsubscriber()
 loadExtensions()
if(LogConfigOnStart) logConfiguration()
this
 } catch{
caseNonFatal(e) ⇒
tryterminate()catch{caseNonFatal(_) ⇒ Try(stopScheduler()) }
throwe
 }

defstart():this.type= _start
}

以上就是ActorSystemImpl的核心构造方法的代码。首先,ActorSystemImpl根据传入的config和classLoade构造了Settings,其后的许多构造都会依赖于这个Settings对象。然后分别构造EventStream和Scheduler对象。其中DynamicAccess是akka内部使用的一个反射工厂类,随后ActorSystemImpl使用这个类传入相关的arguments(这里省略arguments的定义,arguments为一系列根据settings和相关变量构造的Vector实例)来构造出ActorRefProvider对象。如2.2小节所述,这个对象今后被主要用以提供新的actorRef,即Actor的构造都会被委托到这个类来做。

随后我们根据settings构造了Mailboxs以及Dispatchers类,这两个类这里可以简单认为是一个根据具体的配置来查询或构造Mailbox和Dispatcher的工具类,其中还有一些和配置相关的检查方法。其中Mailboxs主要是用来根据Config和ActorRef查询或构造具体的MailboxType对象,之后便可以通过MailboxType对象的 def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue 方法来构造MessageQueue,并用其去构造Mailbox。而Dispatchers类最主要的是提供一个 def lookup(id: String): MessageDispatcher 方法来根据配置返回具体的Dispatcher对象,在2.1节中的例子,akka中所有的actor都共用同一个Dispatcher对象,即默认的Dispatcher。然后,ActorSystemImpl初始化了系统global dispatcher的引用以及guardian和systemGuardian的actorRef引用。其中guardian和systemGuardian为不同的两个actor,分贝代表用户actorGroup的root以及系统actorGroup的root。具体这两个actor的阐述请见akka文档,在这里就不做赘述。

最后,我们看到 def start() 方法,这个方法做了以下几件事:

  • 注册termination hook
  • 调用ActorRefProvider的init方法做postinit
  • 启动eventStream

在ActorRefProvider的init方法中主要也是做一些初始化工作,具体代码如下:

system = _system
rootGuardian.start()
// chain death watchers so that killing guardian stops the application
systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
eventStream.startDefaultLoggers(_system)

到此为止,我们的ActorSystem就初始化完毕。

2.3.2 ActorRef构造

在2.1节中,我们通过 val actorTest = system.actorOf(ActorTest.props(), name = "actorTest") 这样一句非常简单的代码来构造出了ActorRef。这里以一副非常简单的关系图展示了ActorRef的内容和依赖:

Akka源码分析(一)

如2.2节所所述,ActorRef的大多数操作实际委托给ActorCell,每一个ActorRef和ActorCell是一一对应的关系。其中ActorCell持有Dispatcher和Mailbox的引用,Dispatcher就2.1节中的例子而言是共用的,所以都指向同一个对象,而Mailbox是每一个ActorCell独占一个对象。Mailbox实现了Runnable方法,在发送消息时,接受者Actor会调用Dispatcher中的 protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit 方法将Mailbox提交到底层的ForkJoinPool中执行,随后会具体介绍。

下面列出经过适当简化后的核心构造过程代码,首先在ActorSystem中调用 def actorOf(props: Props, name: String): ActorRef 方法来请求创建新的ActorRef时,经过一些列的check和异常判断处理最终是调用到了ActorRefProvider的 actorOf 方法,由于其为抽象方法,这里具体的实现者为LocalActorRefProvider。下面为LocalActorRefProvider的 actorOf 方法的核心代码:

try{
valdispatcher = system.dispatchers.lookup(props2.dispatcher)
valmailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config)

if(async)newRepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async)
elsenewLocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path)
} catch{
//略
}

首先获取到对应的dispatcher和mailboxType,然后根据async是否为异步actor的标记来决定是创建RepointableActorRef还是LocalActorRef。其中,RepointableActorRef在这里并没有如其javadoc中所述的延迟加载真正的ActorCell的作用,由于其在初始化后马上就调用了initialize方法(这里为了叙述的简洁性,故省略了RepointableActorRef的延迟初始化ActorCell的详细过程)。在RepointableActorRef的构造之中(由于真正的初始化过程比较复杂,所以这里省略延时初始化和swap的过程,因为这里根本没有使用到延时初始化Cell的场景,有兴趣的可以直接查看RepointableActorRef的源码),这里可以简单理解最终构造了ActorCell类,并直接传入了Dispatcher和MailboxType对象并赋值给相关变量。那么从MailboxType具体构造Mailbox的过程又是怎样的呢?让我们来看一下ActorCell的 def init(sendSupervise: Boolean, mailboxType: MailboxType) 方法。

private[akka]traitDispatch{this: ActorCell =>
finaldefinit(sendSupervise: Boolean, mailboxType: MailboxType):this.type= {
valmbox = dispatcher.createMailbox(this, mailboxType)
//前后省略
 }
}

这里ActorCell被隐式转换为Dispatch,init方法会在ActorRef构造ActorCell之后被立即调用。init方法中我们会调用Dispatcher中的 protected[akka] def createMailbox(actor: Cell, mailboxType: MailboxType): Mailbox 方法,我们可以进一步看一下Dispatcher中的具体实现。

protected[akka]defcreateMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox = {
newMailbox(mailboxType.create(Some(actor.self), Some(actor.system)))withDefaultSystemMessageQueue
}

这里就已经很明确了,我们调用mailboxType的 create 方法返回一个MessageQueue,并传给Mailbox作为构造函数参数,最终构造出了Mailbox,并赋值给了ActorCell。

目前为止,RepointableActorRef构造完成。

2.3.3 Actor消息发送和接收

最后,我们来看一下ActorRef是怎么发送接收消息,最终落实调用到我们自定义的ActorTest中的 receive 方法。调用ActorRef的 ! 方法后,实际上调用的是对应ActorCell的 sendMessage 方法,具体代码如下:

finaldefsendMessage(message: Any, sender: ActorRef): Unit =
 sendMessage(Envelope(message, sender, system))

这里将message wrap成了Envelop对象,然后继续调用其他重载的 sendMessage 方法,我们省略中间的调用链,于是随后调用到了Dispatch类(可以有ActorCell类隐式转换而来)的 def sendMessage(msg: Envelope): Unit 方法,此方法中实际上核心代码为 dispatcher.dispatch(this, msg) ,其中dispatcher为Dispatcher对象。然后就是之前2.2小节提到过的Dispatcher中的 protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit 方法了,为了方便阅读这里再次把代码贴出来:

protected[akka]defdispatch(receiver: ActorCell, invocation: Envelope): Unit = {
valmbox = receiver.mailbox
 mbox.enqueue(receiver.self, invocation)
 registerForExecution(mbox, true,false)
}

非常简单的逻辑,下面最重要的 registerForExecution 方法会将mailbox提交到ForkJoinPool中执行,这里前面已经提到过,Mailbox类实现了Runnable方法。那么让我们愉快地进入 registerForExecution 方法,下面为该方法的核心代码:

try{
 executorService execute mbox
//省略
} catch{
//省略
}

其实 registerForExecution 做的事情非常简单,就是将Mailbox对象实例mbox提交到executorService中执行,也就是ForkJoinPool。那么,现在让我们来看一看Mailbox中 run 方法的内容:

overridefinaldefrun(): Unit = {
try{
if(!isClosed) {//Volatile read, needed here
 processAllSystemMessages() //First, deal with any system messages
 processMailbox() //Then deal with messages
 }
} finally{
 setAsIdle() //Volatile write, needed here
 dispatcher.registerForExecution(this,false,false)
 }
}

这里注释非常清楚,由于系统消息的优先级比较高,因此首先我们需要处理系统消息,然后才是用户Actor的消息。然后让我们进入 processMailbox 方法一窥究竟:

/**
 * Process the messages in the mailbox
 */
@tailrecprivatefinaldefprocessMailbox(
 left: Int = java.lang.Math.max(dispatcher.throughput, 1),
 deadlineNs: Long = if(dispatcher.isThroughputDeadlineTimeDefined ==true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanoselse0L): Unit =
if(shouldProcessMessage) {
valnext = dequeue()
if(next nenull) {
if(Mailbox.debug) println(actor.self +" processing message "+ next)
 actor invoke next
if(Thread.interrupted())
thrownewInterruptedException("Interrupted while processing actor messages")
 processAllSystemMessages()
if((left >1) && ((dispatcher.isThroughputDeadlineTimeDefined ==false) || (System.nanoTime - deadlineNs) <0))
 processMailbox(left - 1, deadlineNs)
 }
 }

这个方法写得非常漂亮,采用了尾递归的方式来处理多条消息。我们可以看到方法中有两个默认形参,其中left为由Dispatcher指定的throughput,即吞吐量;deadlineNs为由Dispatcher所指定的最长处理结束时间。这里将根据这两个参数来决定是否继续处理用户消息,即如果处理数量没有超过最大吞吐量限制并且处理时间没有超过deadlineNs,并且如果还有可以处理的消息在队列中的话,则继续通过尾递归的方式处理消息。

当方法每dequeue一条消息时,通过调用ActorCell(actor的类型为ActorCell)的invoke方法来处理Envelope(next的类型为Envelope)。具体invoke的细节在这里就不加赘述,最终将会调用到ActorCell所对应的用户自定义的Actor的receive方法上。即Actor类中的 protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)

到此为止,akka就正式处理完毕一条用户Actor所产生的消息。

(Part 1 完)

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址