注册 登录  
 加关注
查看详情
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

淡泊明智

 
 
 

日志

 
 

MINA2收包中对粘包的处理  

2012-11-15 17:32:25|  分类: Mina介绍 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
 MINA2中(MINA2 RC版本,MINA2.0正式版已经发布)服务端接受数据默认有一定长度的缓冲区(可以在启动的时候设置)。那么对于大报文,怎么处理呢?比如说超过1024,甚至更多?MINA2为了节省网络流量,提高处理效率,会将大报文自动拆分(可能是存放MINA2中的缓冲区里面):比如2048字节的报文,就会拆分成两次;那么在接受的时候,就有一个如何判断是完整报文的问题,或者说是一个拆包组包的问题。
  MINA2中初始化服务的时候是可以设置输入和输出的缓冲区的:
 
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
acceptor.getSessionConfig().setReadBufferSize(1024);  
     acceptor.getSessionConfig().setReadBufferSize(1024);  

    MINA2提供的案例是,在IoSession中设置一个类似于session,存在在当前IoSession中的全局变量,在此IoSession中有效。
 
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
private final AttributeKey TEST = new AttributeKey(getClass(), "TEST");   
    
private final AttributeKey TEST = new AttributeKey(getClass(), "TEST");    

  大家都知道,通过 SOCKET TCP/IP传输过来的报文是不知道边界的,所以一般会约定在前端固定长度的字节加上报文长度,让SERVER来根据这个长度来确定整个报文的边界,在我前面的博文有提到。其实MINA2中有:
  prefixedDataAvailable(4) int
方法,来判断固定长度的报文长度,但是参数只能是1,2,4;该方法很好用。判断前四字节的整型值是否大于等于整个缓冲区的数据。可以方便的判断一次 messageReceived 过来的数据是否完整。(前提是自己设计的网络通讯协议前四字节等于发送数据的长度) ,如果你不是设定1,2,4字节来作为长度的话,那么就没辙了。
   在你的解码操作中,MINA2的缓冲区发多少次报文,你的decode方法就会调用多少次。
  上面设置了session之后,可以采用一个方法:
 
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
/**  
 *   
 * @param session  
 *            会话信息  
 * @return 返回session中的累积  
 */  
private Context getContext(IoSession session) {   
    Context ctx = (Context) session.getAttribute(CONTEXT);   
    if (ctx == null) {   
        ctx = new Context();   
        session.setAttribute(CONTEXT, ctx);   
    }   
    return ctx;   
}   
 /**    *     * @param session    *            会话信息    * @return 返回session中的累积    */   private Context getContext(IoSession session) {    Context ctx = (Context) session.getAttribute(CONTEXT);    if (ctx == null) {     ctx = new Context();     session.setAttribute(CONTEXT, ctx);    }    return ctx;   }   

然后在你的decode方法中,首先从session取出数据对象,进行拼接:
 
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
Context ctx = getContext(session);   
  
 // 先把当前buffer中的数据追加到Context的buffer当中   
 ctx.append(ioBuffer);   
 // 把position指向0位置,把limit指向原来的position位置   
 IoBuffer buf = ctx.getBuffer();   
 buf.flip();  
 Context ctx = getContext(session);      // 先把当前buffer中的数据追加到Context的buffer当中    ctx.append(ioBuffer);    // 把position指向0位置,把limit指向原来的position位置    IoBuffer buf = ctx.getBuffer();    buf.flip();  

 
接着读取每次报文的总长度:
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
// 读取消息头部分   
byte[] bLeng = new byte[packHeadLength];   
buf.get(bLeng);   
int length = -1;   
try {   
  length = Integer.parseInt(new String(bLeng));   
} catch (NumberFormatException ex) {   
  ex.printStackTrace();   
}   
if (length > 0) {   
  ctx.setMsgLength(length);   
}  
// 读取消息头部分  byte[] bLeng = new byte[packHeadLength];  buf.get(bLeng);  int length = -1;  try {    length = Integer.parseInt(new String(bLeng));  } catch (NumberFormatException ex) {    ex.printStackTrace();  }  if (length > 0) {    ctx.setMsgLength(length);  }  

在读取到每次报文的长度之后,就接着循环判断BUF里面的字节数据是否已经全部接受完毕了,如果没有接受完毕,那么就不处理;下面是完整处理的代码:
 
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
while (buf.remaining() >= packHeadLength) {   
    buf.mark();   
    // 设置总长度   
    if (ctx.getMsgLength() <= 0) {   
        // 读取消息头部分   
        byte[] bLeng = new byte[packHeadLength];   
        buf.get(bLeng);   
        int length = -1;   
        try {   
            length = Integer.parseInt(new String(bLeng));   
            } catch (NumberFormatException ex) {   
                        ex.printStackTrace();   
            }   
            if (length > 0) {   
              ctx.setMsgLength(length);   
               }   
            }   
  
  
            // 读取消息头部分   
            int length = ctx.getMsgLength();   
            // 检查读取的包头是否正常,不正常的话清空buffer   
            if (length < 0) { // || length > maxPackLength2) {   
            buf.clear();   
            out.write("ERROR!");   
            break;   
            // 读取正常的消息包,并写入输出流中,以便IoHandler进行处理   
          } else if (length > packHeadLength && buf.remaining() >= length) {   
            //完整的数据读取之后,就可以开始做你自己想做的操作了                        
    } else {   
        // 如果消息包不完整   
        // 将指针重新移动消息头的起始位置   
        buf.reset();   
        break;   
    }   
     }   
    if (buf.hasRemaining()) { // 如果有剩余的数据,则放入Session中   
       // 将数据移到buffer的最前面   
             IoBuffer temp = IoBuffer.allocate(2048).setAutoExpand(   
                        true);   
       temp.put(buf);   
       temp.flip();   
       buf.clear();   
       buf.put(temp);   
  
    } else { // 如果数据已经处理完毕,进行清空   
    buf.clear();   
}   
   
while (buf.remaining() >= packHeadLength) {   buf.mark();   // 设置总长度   if (ctx.getMsgLength() <= 0) {    // 读取消息头部分    byte[] bLeng = new byte[packHeadLength];    buf.get(bLeng);    int length = -1;    try {     length = Integer.parseInt(new String(bLeng));     } catch (NumberFormatException ex) {        ex.printStackTrace();     }     if (length > 0) {       ctx.setMsgLength(length);        }     }         // 读取消息头部分     int length = ctx.getMsgLength();     // 检查读取的包头是否正常,不正常的话清空buffer     if (length < 0) { // || length > maxPackLength2) {     buf.clear();     out.write("ERROR!");     break;     // 读取正常的消息包,并写入输出流中,以便IoHandler进行处理            } else if (length > packHeadLength && buf.remaining() >= length) {     //完整的数据读取之后,就可以开始做你自己想做的操作了         } else {       // 如果消息包不完整       // 将指针重新移动消息头的起始位置       buf.reset();       break;   }       }      if (buf.hasRemaining()) { // 如果有剩余的数据,则放入Session中      // 将数据移到buffer的最前面               IoBuffer temp = IoBuffer.allocate(2048).setAutoExpand(        true);      temp.put(buf);      temp.flip();      buf.clear();      buf.put(temp);        } else { // 如果数据已经处理完毕,进行清空   buf.clear();  }   

为了便于操作,最好设置一个内部类:
Java代码 复制代码 收藏代码MINA2收包中对粘包的处理 - 火木棉 - 淡泊明智
private class Context {   
        private final CharsetDecoder decoder;   
        private IoBuffer buf;   
        private int msgLength = 0;   
        private int overflowPosition = 0;   
  
        /**  
         *   
         *   
         */  
        private Context() {   
            decoder = charset.newDecoder();   
            buf = IoBuffer.allocate(80).setAutoExpand(true);   
        }   
  
        /**  
         *   
         *   
         * @return CharsetDecoder  
         */  
        public CharsetDecoder getDecoder() {   
            return decoder;   
        }   
  
        /**  
         *   
         *   
         * @return IoBuffer  
         */  
        public IoBuffer getBuffer() {   
            return buf;   
        }   
  
        /**  
         *   
         *   
         * @return overflowPosition  
         */  
        public int getOverflowPosition() {   
            return overflowPosition;   
        }   
  
        /**  
         *   
         *  
         * @return matchCount  
         */  
        public int getMsgLength() {   
            return msgLength;   
        }   
  
        /**  
         *   
         *   
         * @param matchCount  
         *            报文长度  
         */  
        public void setMsgLength(int msgLength) {   
            this.msgLength = msgLength;   
        }   
  
        /**  
         *   
         *   
         */  
        public void reset() {   
            this.buf.clear();   
            this.overflowPosition = 0;   
            this.msgLength = 0;   
            this.decoder.reset();   
        }   
  
        /**  
         *   
         * @param in  
         *            输入流  
         */  
        public void append(IoBuffer in) {   
            getBuffer().put(in);   
  
        }   
  
    }  
private class Context {    private final CharsetDecoder decoder;    private IoBuffer buf;    private int msgLength = 0;    private int overflowPosition = 0;      /**     *      *      */    private Context() {     decoder = charset.newDecoder();     buf = IoBuffer.allocate(80).setAutoExpand(true);    }      /**     *      *      * @return CharsetDecoder     */    public CharsetDecoder getDecoder() {     return decoder;    }      /**     *      *      * @return IoBuffer     */    public IoBuffer getBuffer() {     return buf;    }      /**     *      *      * @return overflowPosition     */    public int getOverflowPosition() {     return overflowPosition;    }      /**     *      *     * @return matchCount     */    public int getMsgLength() {     return msgLength;    }      /**     *      *      * @param matchCount     *            报文长度     */    public void setMsgLength(int msgLength) {     this.msgLength = msgLength;    }      /**     *      *      */    public void reset() {     this.buf.clear();     this.overflowPosition = 0;     this.msgLength = 0;     this.decoder.reset();    }      /**     *      * @param in     *            输入流     */    public void append(IoBuffer in) {     getBuffer().put(in);      }     }  

  评论这张
 
阅读(485)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018