RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。

 

1. RPC 框架

我们首先通过一张图理解 RPC 的工作流程:

因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:

即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。

 

2. RPCServer 实现

2.1 Server 初始化

1.  `/**`
2.  `* 队列名、交换机名、路由键`
3.  `*/`
4.  `private static final String EXCHANGE_NAME = "rpc_exchange";`
5.  `private static final String QUEUE_NAME = "request_rpc_queue";`
6.  `private static final String ROUTING_KEY = "rpc_routing_key";`
7.  `private Connection connection = null;`
8.  `private Channel channel = null;`
9.  `private QueueingConsumer consumer = null;`
10.  `/**`
11.  `* Server的构造函数`
12.  `*/`
13.  `private RPCServer() {`
14.  `try {`
15.  `//创建链接`
16.  `ConnectionFactory factory = new ConnectionFactory();`
17.  `factory.setHost(Config.HOST);`
18.  `factory.setPort(Config.PORT);`
19.  `factory.setUsername(Config.USER);`
20.  `factory.setPassword(Config.PASSWORD);`
21.  `connection = factory.newConnection();`
22.  `//创建信道`
23.  `channel = connection.createChannel();`
24.  `//设置AMQP的通信结构`
25.  `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
26.  `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
27.  `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
28.  `//设置消费者`
29.  `consumer = new QueueingConsumer(channel);`
30.  `channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);`
31.  `} catch (Exception e) {`
32.  `LOG.error("build connection failed!", e);`
33.  `}`
34.  `}`

初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。

2.2 监听队列并反馈

1.  `/**`
2.  `* 开启server`
3.  `*/`
4.  `private void startServer() {`
5.  `try {`
6.  `LOG.info("Waiting for RPC calls.....");`
7.  `while (true) {`
8.  `//获得文本消息`
9.  `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
10.  `BasicProperties props = delivery.getProperties();`
11.  `//返回消息的属性`
12.  `BasicProperties replyProps = new BasicProperties.Builder()`
13.  `.correlationId(props.getCorrelationId())`
14.  `.build();`
15.  `long receiveTime = System.currentTimeMillis();`
16.  `JSONObject json = new JSONObject();`
17.  `try {`
18.  `String message = new String(delivery.getBody(), "UTF-8");`
19.  `int n = Integer.parseInt(message);`
20.  `LOG.info("Got a request: fib(" + message + ")");`
21.  `json.put("status", "success");`
22.  `json.put("result", fib(n));`
23.  `} catch (Exception e) {`
24.  `json.put("status", "fail");`
25.  `json.put("reason", "Not a Number!");`
26.  `LOG.error("receive message failed!", e);`
27.  `} finally {`
28.  `long responseTime = System.currentTimeMillis();`
29.  `json.put("calculateTime", (responseTime - receiveTime));`
30.  `channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));`
31.  `channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);`
32.  `}`
33.  `}`
34.  `} catch (Exception e) {`
35.  `LOG.error("server failed!", e);`
36.  `} finally {`
37.  `if (connection != null) {`
38.  `try {`
39.  `connection.close();`
40.  `} catch (Exception e) {`
41.  `LOG.error("close failed!", e);`
42.  `}`
43.  `}`
44.  `}`
45.  `}`

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。
最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。

到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。

 

3. RPCClient 实现

3.1 初始化 CLient

1.  `/**`
2.  `* 消息请求的队列名、交换机名、路由键`
3.  `*/`
4.  `private static final String EXCHANGE_NAME = "rpc_exchange";`
5.  `private static final String QUEUE_NAME = "request_rpc_queue";`
6.  `private static final String ROUTING_KEY = "rpc_routing_key";`
7.  `/**`
8.  `* 消息返回的队列名、交换机名、路由键`
9.  `*/`
10.  `private static final String RESPONSE_QUEUE = "response_rpc_queue";`
11.  `private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";`
12.  `/**`
13.  `* RabbitMQ的实体`
14.  `*/`
15.  `private Connection connection = null;`
16.  `private Channel channel = null;`
17.  `private QueueingConsumer consumer = null;`
18.  `/**`
19.  `* 构造客户端`
20.  `* @throws Exception`
21.  `*/`
22.  `private RPCClient() throws Exception {`
23.  `ConnectionFactory factory = new ConnectionFactory();`
24.  `factory.setHost(Config.HOST);`
25.  `factory.setPort(Config.PORT);`
26.  `factory.setUsername(Config.USER);`
27.  `factory.setPassword(Config.PASSWORD);`
28.  `connection = factory.newConnection();`
29.  `channel = connection.createChannel();`
30.  `channel.exchangeDeclare(EXCHANGE_NAME, "direct");`
31.  `channel.queueDeclare(QUEUE_NAME, false, false, false, null);`
32.  `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);`
33.  `channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);`
34.  `channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);`
35.  `consumer = new QueueingConsumer(channel);`
36.  `channel.basicConsume(RESPONSE_QUEUE, true, consumer);`
37.  `}`

这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。

3.2 发送 / 接收消息

1.  `/**`
2.  `* 请求server`
3.  `* @param message`
4.  `* @return`
5.  `* @throws Exception`
6.  `*/`
7.  `private String requestMessage(String message) throws Exception {`
8.  `String response = null;`
9.  `String corrId = UUID.randomUUID().toString();`
10.  `BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();`
11.  `channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));`
12.  `while (true) {`
13.  `QueueingConsumer.Delivery delivery = consumer.nextDelivery();`
14.  `if (delivery.getProperties().getCorrelationId().equals(corrId)) {`
15.  `response = new String(delivery.getBody(),"UTF-8");`
16.  `break;`
17.  `}`
18.  `}`
19.  `return response;`
20.  `}`

BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。

 

4. 运行测试

Client 端发送:

Server 端接收并处理:

Client 收到计算结果:

由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。

 

5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

1. `JDK1.6以上 + Maven+ RabbitMQ`

5.2 源代码

完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo

其中 Server 的代码在:

1. `rpc.RPCServer`

Client 端的代码位置:

1. `rpc.RPCClient`

以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!