关键字:spark、scala、rpc、akka、remote actor
Spark中的RPC是使用Akka实现的,Akka的设计目标就是为分布式,Actor之间的交互都是通过消息,并且所有动作都是异步的。
在Spark应用程序中会有需要实现RPC的功能,比如:从一个一直运行的Spark Streaming应用程序中实时获取一些数据,或者更新一些变量等等,这时候,可以使用Akka的Remote Actor,将Spark Streaming应用程序作为一个Remote Actor(Server),当需要获取数据时候,可以向Server发送请求消息,Server收到请求并处理之后,将结果返回给客户端,需要注意的是,客户端向Server发送消息之后,需要等待Server将数据响应回来之后,才能处理并退出,这里的客户端,对于Server其实也是一个Remote Actor。
本文学习和介绍使用Scala基于Akka的Remote Actor实现的简单RPC示例程序。
Server端
1. Server端作为一个Remote Actor,在本机的2555端口对外提供服务;
2. 在接收到消息类型为AkkaMessage之后,将消息做一简单处理(在前面加上response_),并封装成Response消息类型,响应给请求者;
package com.lxw1234.rpc import akka.actor.Actor import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import akka.actor.Props case class AkkaMessage(message: Any) case class Response(response: Any) /** * author http://lxw1234.com */ class Server extends Actor { override def receive: Receive = { //接收到的消息类型为AkkaMessage,则在前面加上response_,返回给sender case msg: AkkaMessage => { println("服务端收到消息: " + msg.message) sender ! Response("response_" + msg.message) } case _ => println("服务端不支持的消息类型 .. ") } } object Server { //创建远程Actor:ServerSystem def main(args: Array[String]): Unit = { val serverSystem = ActorSystem("lxw1234", ConfigFactory.parseString(""" akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2555 } } } """)) serverSystem.actorOf(Props[Server], "server") } }
启动Server端:
Client端
1. Client启动之后,亦作为一个Remote Actor;
2. 接收到请求者的消息类型为AkkaMessage之后,将其转发至Server端;
3. 接收到Server端返回的Response之后,将其响应给消息请求者;
4. 请求者在向Client端的Actor发送一个AkkaMessage消息之后,等待响应之后,再继续发送下一个消息;
package com.lxw1234.rpc import akka.actor.Actor import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import akka.actor.Props import akka.actor.ActorSelection import scala.concurrent.Await import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration._ import scala.concurrent._ import scala.concurrent.Future import Server._ /** * author http://lxw1234.com */ class Client extends Actor { //远程Actor var remoteActor : ActorSelection = null //当前Actor var localActor : akka.actor.ActorRef = null @throws[Exception](classOf[Exception]) override def preStart(): Unit = { remoteActor = context.actorSelection("akka.tcp://lxw1234@127.0.0.1:2555/user/server") println("远程服务端地址 : " + remoteActor) } override def receive: Receive = { //接收到消息类型为AkkaMessage后,将消息转发至远程Actor case msg: AkkaMessage => { println("客户端发送消息 : " + msg) this.localActor = sender() remoteActor ! msg } //接收到远程Actor发送的消息类型为Response,响应 case res: Response => { localActor ! res } case _ => println("客户端不支持的消息类型 .. ") } } object Client { def main(args: Array[String]) : Unit = { val clientSystem = ActorSystem("ClientSystem", ConfigFactory.parseString(""" akka { actor { provider = "akka.remote.RemoteActorRefProvider" } } """)) var client = clientSystem.actorOf(Props[Client]) var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4")) implicit val timeout = Timeout(3 seconds) msgs.foreach { x => val future = client ? x val result = Await.result(future,timeout.duration).asInstanceOf[Response] println("收到的反馈: " + result) } // msgs.foreach { x => // client ! x // } clientSystem.shutdown() } }
Client端运行之后:
Client端启动之后,在本地的2552端口运行了Remote Actor;
每次发送一个消息之后,等到响应Response之后,才发送下一个消息;
所有消息发送并收到响应之后,shutdown;
此时,Server端打印出:
如果将客户端的消息发送改为异步模式,运行结果如下:
客户端将4条消息发送出去,没有等待响应就已经shutdown了。
您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。
如果觉得本博客对您有帮助,请 赞助作者 。