微信搜索bigdata029 | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 订阅本站 | 赞助作者:赞助作者

Scala基于Akka的Remote Actor实现的简单RPC

Spark lxw1234@qq.com 5898℃ 1评论

关键字:spark、scala、rpc、akka、remote actor

Spark中的RPC是使用Akka实现的,Akka的设计目标就是为分布式,Actor之间的交互都是通过消息,并且所有动作都是异步的。
在Spark应用程序中会有需要实现RPC的功能,比如:从一个一直运行的Spark Streaming应用程序中实时获取一些数据,或者更新一些变量等等,这时候,可以使用Akka的Remote Actor,将Spark Streaming应用程序作为一个Remote Actor(Server),当需要获取数据时候,可以向Server发送请求消息,Server收到请求并处理之后,将结果返回给客户端,需要注意的是,客户端向Server发送消息之后,需要等待Server将数据响应回来之后,才能处理并退出,这里的客户端,对于Server其实也是一个Remote Actor。
本文学习和介绍使用Scala基于Akka的Remote Actor实现的简单RPC示例程序。

Server端

1. Server端作为一个Remote Actor,在本机的2555端口对外提供服务;

2. 在接收到消息类型为AkkaMessage之后,将消息做一简单处理(在前面加上response_),并封装成Response消息类型,响应给请求者;

package com.lxw1234.rpc

import akka.actor.Actor
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props

case class AkkaMessage(message: Any)
case class Response(response: Any)
/**
 * author http://lxw1234.com
 */
class Server extends Actor {
  override def receive: Receive = {
    //接收到的消息类型为AkkaMessage,则在前面加上response_,返回给sender
    case msg: AkkaMessage => {
      println("服务端收到消息: " + msg.message)
      sender ! Response("response_" + msg.message)
    }
    case _ => println("服务端不支持的消息类型 .. ")
  }
}

object Server {
  //创建远程Actor:ServerSystem
  def main(args: Array[String]): Unit = {
    val serverSystem = ActorSystem("lxw1234", ConfigFactory.parseString("""
      akka {
       actor {
          provider = "akka.remote.RemoteActorRefProvider"
        }
        remote {
          enabled-transports = ["akka.remote.netty.tcp"]
          netty.tcp {
            hostname = "127.0.0.1"
            port = 2555
          }
        }
      }
     """))
     
     serverSystem.actorOf(Props[Server], "server")
     
  }
}

启动Server端:

remote actor

Client端

1.    Client启动之后,亦作为一个Remote Actor;
2.    接收到请求者的消息类型为AkkaMessage之后,将其转发至Server端;
3.    接收到Server端返回的Response之后,将其响应给消息请求者;
4.    请求者在向Client端的Actor发送一个AkkaMessage消息之后,等待响应之后,再继续发送下一个消息;

package com.lxw1234.rpc

import akka.actor.Actor
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import akka.actor.ActorSelection
import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.Future
import Server._

/**
 * author http://lxw1234.com
 */
class Client extends Actor {
  
  //远程Actor
  var remoteActor : ActorSelection = null
  //当前Actor
  var localActor : akka.actor.ActorRef = null
  
  @throws[Exception](classOf[Exception])
  override def preStart(): Unit = {
    remoteActor = context.actorSelection("akka.tcp://lxw1234@127.0.0.1:2555/user/server")
    println("远程服务端地址 : " + remoteActor)
  }
  
  override def receive: Receive = {
    //接收到消息类型为AkkaMessage后,将消息转发至远程Actor
    case msg: AkkaMessage => {
      println("客户端发送消息 : " + msg)
      this.localActor = sender()
      remoteActor ! msg
    }
    //接收到远程Actor发送的消息类型为Response,响应
    case res: Response => {
      localActor ! res
    }
    case _ => println("客户端不支持的消息类型 .. ")
    
  }
}

object Client {
  def main(args: Array[String]) : Unit = {
    val clientSystem = ActorSystem("ClientSystem", ConfigFactory.parseString("""
      akka {
       actor {
          provider = "akka.remote.RemoteActorRefProvider"
        }
      }
     """))
     
     var client = clientSystem.actorOf(Props[Client])
     var msgs = Array[AkkaMessage](AkkaMessage("message1"),AkkaMessage("message2"),AkkaMessage("message3"),AkkaMessage("message4"))
     
     implicit val timeout = Timeout(3 seconds)
     
     msgs.foreach { x => 
       val future = client ? x
       val result = Await.result(future,timeout.duration).asInstanceOf[Response]
       println("收到的反馈: " + result)
     }
     
//     msgs.foreach { x => 
//       client ! x
//     }
    
     clientSystem.shutdown()
     
  }
}

Client端运行之后:

remote actor

Client端启动之后,在本地的2552端口运行了Remote Actor;

每次发送一个消息之后,等到响应Response之后,才发送下一个消息;

所有消息发送并收到响应之后,shutdown;

此时,Server端打印出:

remote actor

如果将客户端的消息发送改为异步模式,运行结果如下:

remote actor

客户端将4条消息发送出去,没有等待响应就已经shutdown了。


您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

 

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Scala基于Akka的Remote Actor实现的简单RPC

喜欢 (9)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(1)个小伙伴在吐槽
  1. 这个很赞!通过Rpc,我建立了一个Spark程序,读取ML训练的Model,然后客户端通过RPC发送新的内容以便获得prediction。
    2016-09-07 16:41 回复