ag凯发k8国际
收集整理的这篇文章主要介绍了
手撕 rpc 2
小编觉得挺不错的,现在分享给大家,帮大家做个参考.
把单个客户端改成多个呢?
在成功连接几个client后报错了,是拆解包的时候出错,为什么呢?看一下多个客户端连接时候的 netty 的模型:
因为服务程序什么时候从内核的buffer里读走数据跟客户端往buffer里写数据是两个独立的动作,所以在多客户端(多个socket)的场景下是不能整齐的读到正确的包,所以报错解码错误,io是双向的,服务端解码错误的问题在客户端也一样,怎么处理?
处理通信流的解码问题
出现这种问题底层的原因是
因为多个socket往一个buffer里写,所以要保证程序读取的时候每次都能跟包的头部(header和body的头部)对齐,而且每次readchannel()的之前程序把该次请求的包发完整。强大的 netty 给了我们 bytetomessagedecoder ,在 pipeline 的业务事件之前加上解码事件就可以了。
另外解决一些问题
1.服务器使用20个线程处理 listen socket 连接和 io socket。
2.多个 client 连接,维护一个线程池来管理这些连接。
3.使用 netty 的 bytetomessagedecoder 解决解码不正确问题。
4.使用 completablefuture 来获取客户端调用方法的返回。
package rpc
;import io
.netty
.bootstrap
.bootstrap
;
import io
.netty
.bootstrap
.serverbootstrap
;
import io
.netty
.buffer
.bytebuf
;
import io
.netty
.buffer
.pooledbytebufallocator
;
import io
.netty
.channel
.*
;
import io
.netty
.channel
.nio
.nioeventloopgroup
;
import io
.netty
.channel
.socket
.nio
.nioserversocketchannel
;
import io
.netty
.channel
.socket
.nio
.niosocketchannel
;
import io
.netty
.handler
.codec
.bytetomessagedecoder
;
import org
.junit
.test
;import java
.io
.*
;
import java
.lang
.reflect
.invocationhandler
;
import java
.lang
.reflect
.method
;
import java
.lang
.reflect
.proxy
;
import java
.net
.inetsocketaddress
;
import java
.util
.list
;
import java
.util
.random
;
import java
.util
.uuid
;
import java
.util
.concurrent
.completablefuture
;
import java
.util
.concurrent
.concurrenthashmap
;
import java
.util
.concurrent
.atomic
.atomicinteger
;
public class myrpctest {@testpublic void startserver() {
nioeventloopgroup eventexecutors
= new nioeventloopgroup(20);nioeventloopgroup worker
= eventexecutors
;serverbootstrap serverbootstrap
= new serverbootstrap();channelfuture localhost
= serverbootstrap
.group(eventexecutors
, worker
).channel(nioserversocketchannel
.class).childhandler(new channelinitializer<niosocketchannel>() {@overrideprotected void initchannel(niosocketchannel niosocketchannel
) {channelpipeline pipeline
= niosocketchannel
.pipeline();pipeline
.addlast(new mydecoder());pipeline
.addlast(new serverrequesthandler());}}).bind(new inetsocketaddress("localhost", 9090));try {localhost
.sync().channel().closefuture().sync();} catch (interruptedexception e
) {e
.printstacktrace();}}@testpublic void get() {new thread(() -> {startserver();}).start();try {thread
.sleep(2000);} catch (interruptedexception e
) {e
.printstacktrace();}system
.out
.println("server start ....");atomicinteger atomicinteger
= new atomicinteger();int size
= 30;thread
[] threads
= new thread[size
];for (int i
= 0; i
< size
; i
) {threads
[i
] = new thread(() -> {car car
= proxyget(car
.class);int args
= atomicinteger
.incrementandget();string s
= car
.race("client: " args
);system
.out
.println("client got args: " s
" --- " args
);});}for (thread thread
: threads
) {thread
.start();}try {system
.in
.read();} catch (ioexception e
) {e
.printstacktrace();}}private static <t> t
proxyget(class
<t> interfaceinfo
) {classloader classloader
= interfaceinfo
.getclassloader();class
<?>[] interfaces
= {interfaceinfo
};return (t
) proxy
.newproxyinstance(classloader
, interfaces
, new invocationhandler() {@overridepublic object
invoke(object proxy
, method method
, object
[] args
) throws throwable
{string name
= interfaceinfo
.getname(); string methodname
= method
.getname(); class
<?>[] parametertypes
= method
.getparametertypes(); mycontent content
= new mycontent();content
.setname(name
);content
.setmethodname(methodname
);content
.setparametertypes(parametertypes
);content
.setargs(args
);bytearrayoutputstream bos
= new bytearrayoutputstream();objectoutputstream oos
= new objectoutputstream(bos
);oos
.writeobject(content
);byte[] msgbody
= bos
.tobytearray();myheader myheader
= createheader(msgbody
);bos
.reset();oos
= new objectoutputstream(bos
);oos
.writeobject(myheader
);byte[] msgheader
= bos
.tobytearray();clientfactory factory
= clientfactory
.getfactory();niosocketchannel client
= factory
.getclient(new inetsocketaddress("localhost", 9090));bytebuf bytebuf
= pooledbytebufallocator
.default
.directbuffer(msgheader
.length msgbody
.length
);completablefuture
<string> future
= new completablefuture<>();responsemappinghandler
.addcallback(myheader
.getrequestid(), future
);bytebuf
.writebytes(msgheader
);bytebuf
.writebytes(msgbody
);channelfuture channelfuture
= client
.writeandflush(bytebuf
);channelfuture
.sync();return future
.get();}});}static myheader
createheader(byte[] msgbytes
) {myheader header
= new myheader();int size
= msgbytes
.length
;int f
= 0x14141414;long requestid
= math
.abs(uuid
.randomuuid().getleastsignificantbits());header
.setflag(f
);header
.setrequestid(requestid
);header
.setdatalen(size
);return header
;}
}
interface car {string
race(string msg
);
}
class myheader implements serializable {int flag
;long requestid
;long datalen
;public int getflag() {return flag
;}public void setflag(int flag
) {this.flag
= flag
;}public long getrequestid() {return requestid
;}public void setrequestid(long requestid
) {this.requestid
= requestid
;}public long getdatalen() {return datalen
;}public void setdatalen(long datalen
) {this.datalen
= datalen
;}
}
class mycontent implements serializable {string name
;string methodname
;class
<?>[] parametertypes
;object
[] args
;string res
;public string
getres() {return res
;}public void setres(string res
) {this.res
= res
;}public string
getname() {return name
;}public void setname(string name
) {this.name
= name
;}public string
getmethodname() {return methodname
;}public void setmethodname(string methodname
) {this.methodname
= methodname
;}public class
<?>[] getparametertypes() {return parametertypes
;}public void setparametertypes(class
<?>[] parametertypes
) {this.parametertypes
= parametertypes
;}public object
[] getargs() {return args
;}public void setargs(object
[] args
) {this.args
= args
;}
}
class clientfactory {int pollsize
= 50; nioeventloopgroup clientworker
;random random
= new random();private static final clientfactory factory
;private clientfactory() {}static {factory
= new clientfactory();}public static clientfactory
getfactory() {return factory
;}concurrenthashmap
<inetsocketaddress, clientpool> outboxes
= new concurrenthashmap<inetsocketaddress, clientpool>();public synchronized niosocketchannel
getclient(inetsocketaddress address
) {clientpool clientpool
= outboxes
.get(address
);if (clientpool
== null
) {outboxes
.putifabsent(address
, new clientpool(pollsize
));clientpool
= outboxes
.get(address
);}int i
= random
.nextint(pollsize
);if (clientpool
.clients
[i
] != null
&& clientpool
.clients
[i
].isactive()) {return clientpool
.clients
[i
];}synchronized (clientpool
.locks
[i
]) {return clientpool
.clients
[i
] = create(address
);}}private niosocketchannel
create(inetsocketaddress address
) {clientworker
= new nioeventloopgroup(1);bootstrap bs
= new bootstrap();channelfuture connect
= bs
.group(clientworker
).channel(niosocketchannel
.class).handler(new channelinitializer<niosocketchannel>() {@overrideprotected void initchannel(niosocketchannel niosocketchannel
) {channelpipeline pipeline
= niosocketchannel
.pipeline();pipeline
.addlast(new mydecoder());pipeline
.addlast(new clientresponses());}}).connect(address
);try {niosocketchannel client
= (niosocketchannel
) connect
.sync().channel();return client
;} catch (interruptedexception e
) {e
.printstacktrace();}return null
;}
}
class clientpool {niosocketchannel
[] clients
;object
[] locks
;clientpool(int size
) {clients
= new niosocketchannel[size
];locks
= new object[size
];for (int i
= 0; i
< size
; i
) {在这里插入代码片locks
[i
] = new object();}}
}class clientresponses extends channelinboundhandleradapter {@overridepublic void channelread(channelhandlercontext ctx
, object msg
) throws exception
{mydatapackage data
= (mydatapackage
) msg
;responsemappinghandler
.runcallback(data
);}
}
class responsemappinghandler {static concurrenthashmap
<long, completablefuture> mapping
= new concurrenthashmap<>();public static void addcallback(long requestid
, completablefuture cb
) {mapping
.putifabsent(requestid
, cb
);}public static void runcallback(mydatapackage data
) {mapping
.get(data
.getheader().getrequestid()).complete(data
.getcontent().getres());removecallback(data
.getheader().getrequestid());}public static void removecallback(long requestid
) {mapping
.remove(requestid
);}
}
class serverrequesthandler extends channelinboundhandleradapter {@overridepublic void channelread(channelhandlercontext ctx
, object msg
) throws exception
{mydatapackage data
= (mydatapackage
) msg
;string iothreadname
= thread
.currentthread().getname();ctx
.executor().parent().next().execute(() -> {
string execthreadname
= thread
.currentthread().getname();mycontent content
= new mycontent();string s
= "io thread: " iothreadname
" exec thread: " execthreadname
" from args: " data
.getcontent().getargs()[0];content
.setres(s
);byte[] contentbyte
= myserializeutil
.serialize(content
);myheader header
= new myheader();header
.setflag(0x14141424);header
.setrequestid(data
.getheader().getrequestid());header
.setdatalen(contentbyte
.length
);byte[] headerbyte
= myserializeutil
.serialize(header
);bytebuf bytebuf
= pooledbytebufallocator
.default
.directbuffer(headerbyte
.length contentbyte
.length
);bytebuf
.writebytes(headerbyte
);bytebuf
.writebytes(contentbyte
);ctx
.writeandflush(bytebuf
);});}
}
class mydecoder extends bytetomessagedecoder {@overrideprotected void decode(channelhandlercontext channelhandlercontext
, bytebuf bytebuf
, list
<object> list
) throws exception
{while (bytebuf
.readablebytes() >= 82) {byte[] bytes
= new byte[82];bytebuf
.getbytes(bytebuf
.readerindex(), bytes
);bytearrayinputstream bis
= new bytearrayinputstream(bytes
);objectinputstream objectinputstream
= new objectinputstream(bis
);myheader myheader
= (myheader
) objectinputstream
.readobject();if (bytebuf
.readablebytes() >= myheader
.getdatalen()) {bytebuf
.readbytes(82); byte[] data
= new byte[(int) myheader
.getdatalen()];bytebuf
.readbytes(data
);bytearrayinputstream bytearrayinputstream
= new bytearrayinputstream(data
);objectinputstream ois
= new objectinputstream(bytearrayinputstream
);if (myheader
.getflag() == 0x14141414) {mycontent mycontent
= (mycontent
) ois
.readobject();list
.add(new mydatapackage(myheader
, mycontent
));} else if (myheader
.getflag() == 0x14141424) {mycontent mycontent
= (mycontent
) ois
.readobject();list
.add(new mydatapackage(myheader
, mycontent
));}} else {break;}}}
}class mydatapackage {private myheader header
;private mycontent content
;public mydatapackage(myheader myheader
, mycontent mycontent
) {this.header
= myheader
;this.content
= mycontent
;}public myheader
getheader() {return header
;}public void setheader(myheader header
) {this.header
= header
;}public mycontent
getcontent() {return content
;}public void setcontent(mycontent content
) {this.content
= content
;}
}
总结
以上是ag凯发k8国际为你收集整理的手撕 rpc 2的全部内容,希望文章能够帮你解决所遇到的问题。
如果觉得ag凯发k8国际网站内容还不错,欢迎将ag凯发k8国际推荐给好友。