博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo 心跳
阅读量:4652 次
发布时间:2019-06-09

本文共 4768 字,大约阅读时间需要 15 分钟。

HeartBeatTask 类封装了心跳定时任务,需要了解的是 provider 和 consumer 都有可能发送心跳。

final class HeartBeatTask implements Runnable {    private static final Logger logger = LoggerFactory.getLogger( HeartBeatTask.class );    private ChannelProvider channelProvider;    private int             heartbeat;    private int             heartbeatTimeout;    HeartBeatTask( ChannelProvider provider, int heartbeat, int heartbeatTimeout ) {        this.channelProvider = provider;        this.heartbeat = heartbeat;        this.heartbeatTimeout = heartbeatTimeout;    }    public void run() {        try {            long now = System.currentTimeMillis();            for ( Channel channel : channelProvider.getChannels() ) {                if (channel.isClosed()) {                    continue;                }                try {
Long lastRead = ( Long ) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP ); Long lastWrite = ( Long ) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP ); if ( ( lastRead != null && now - lastRead > heartbeat ) || ( lastWrite != null && now - lastWrite > heartbeat ) ) { Request req = new Request(); req.setVersion( "2.0.0" ); req.setTwoWay( true ); req.setEvent( Request.HEARTBEAT_EVENT ); channel.send( req ); } if (lastRead != null && now - lastRead > heartbeatTimeout) { //如果是 consumer 端 if (channel instanceof Client) { ((Client)channel).reconnect(); } else { // provider 端 channel.close(); } } } catch ( Throwable t ) { } } } catch ( Throwable t ) { logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t ); } } interface ChannelProvider { Collection
getChannels(); }}

对于 consumer,是在 HeaderExchangeClient 类中启动心跳定时器,而 provider,则是在 HeaderExchangeServer 中启动心跳定时器。

consumer发送请求时,更新 lastWrite 值,接收响应时,更新 lastRead 值。心跳定时器定时检查 lastRead 和 lastWrite,发送心跳、重连。

public class HeaderExchangeClient implements ExchangeClient {    private static final ScheduledThreadPoolExecutor scheduled =         new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));    // 心跳定时器    private ScheduledFuture
heatbeatTimer; private int heartbeat; private int heartbeatTimeout; public HeaderExchangeClient(Client client){ if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; this.channel = new HeaderExchangeChannel(client); String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); //heartbeat = 60000 this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 ); //heartbeatTimeout = 180000 this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 ); if ( heartbeatTimeout < heartbeat * 2 ) { throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" ); } startHeatbeatTimer(); } public ResponseFuture request(Object request) throws RemotingException { return channel.request(request); } private void startHeatbeatTimer() { stopHeartbeatTimer(); if ( heartbeat > 0 ) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { public Collection
getChannels() { return Collections.
singletonList( HeaderExchangeClient.this ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS ); } }}

在 HeartbeatHandler 类中设置 lastRead 和 lastWrite 值:

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {    //省略其他代码    private void setReadTimestamp(Channel channel) {        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());    }    private void setWriteTimestamp(Channel channel) {        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());    }}

设置 lastWrite 的调用栈:

设置 lastRead 的调用栈:

转载于:https://www.cnblogs.com/allenwas3/p/8335778.html

你可能感兴趣的文章
MYSQL 常用语句保存
查看>>
json和对象相互转化方法
查看>>
BZOJ 4173: 数学
查看>>
1289 大鱼吃小鱼 1305 Pairwise Sum and Divide 1344 走格子 1347 旋转字符串 1381 硬币游戏...
查看>>
利用Scrapy爬取自己的CSDN博客
查看>>
易语言运算命令的解释(位取反、位于、位或、位异或)
查看>>
真人测试网站用户体验的超棒在线服务 - Peek by UserTesting
查看>>
Rstudio 实现 爬虫 文本分词 个性化词云设计--我爱中国我爱党
查看>>
表单验证提交——submit与button
查看>>
5.1对终端进行读写
查看>>
DirectX中的纹理及其创建
查看>>
20145322第九周JAVA程序设计基础学习总结
查看>>
面向对象(OOP)基本概念
查看>>
02 变量和语句
查看>>
(C#) 多线程访问探讨,如果保证线程安全?
查看>>
NOIP2013火柴排队[逆序对]
查看>>
DataPipeline的增量数据支持回滚功能
查看>>
工厂模式之简单工厂案例
查看>>
SAP技术工作
查看>>
Java: Difference between ArrayList and LinkedList
查看>>