当 ServerSocketChannel
与 SockelChannel
采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService; //线程池
private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
public EchoServer() throws IOException {
//创建一个线程池
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//把服务器进程与一个本地端口绑定
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务器启动");
}
public void service() {
while (true) {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
//处理客户连接
executorService.execute(new Handler(socketChannel));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws IOException {
new EchoServer().service();
}
//处理客户连按
class Handler implements Runnable {
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
handle(socketChannel);
}
public void handle(SocketChannel socketChannel) {
try {
//获得与socketChannel关联的Socket对象
Socket socket = socketChannel.socket();
System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getReader(socket);
PrintWriter pw = getWriter(socket);
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
pw.println(echo(msg));
if (msg.equals("bye")) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(socketChannel != null) {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream socketOut = socket.getOutputStream();
return new PrintWriter(socketOut,true);
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream socketIn = socket.getInputStream();
return new BufferedReader(new InputStreamReader(socketIn));
}
public String echo(String msg) {
return "echo:" + msg;
}
}
在非阻塞模式下,EchoServer
只需要启动一个主线程,就能同时处理三件事:
EchoServer
委托 Selector
来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件
// 创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer
类的 service()
方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1层while循环
while(selector.select() > 0) {
//获得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2层while循环
while (it.hasNext()) {
SelectionKey key = null;
//处理SelectionKey
try {
//取出一个SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey从Selector 的selected-key 集合中删除
it.remove();
1f (key.isAcceptable()) { 处理接收连接就绪事件; }
if (key.isReadable()) { 处理读就绪水件; }
if (key.isWritable()) { 处理写就绪事件; }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使这个SelectionKey失效
key.cancel();
//关闭与这个SelectionKey关联的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
ServerSocketChannel
向 Selector
注册接收连接就绪事件,如果 Selector
监控到该事件发生,就会把相应的 SelectionKey
对象加入 selected-keys
集合Selector
已经发生的事件,select()
方法返回当前相关事件已经发生的 SelectionKey
的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector
的 selectedKeys()
方法返回 selected-keys
集合,它存放了相关事件已经发生的 SelectionKey
对象selected-keys
集合中依次取出每个 SelectionKey
对象并从集合中删除,,然后调用 isAcceptable()
、isReadable()
和 isWritable()
方法判断到底是哪种事件发生了,从而做出相应的处理if (key.isAcceptable()) {
//获得与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//获得与客户连接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel设置为非阻塞模式
socketChannel.configureBlocking(false);
//创建一个用于存放用户发送来的数据的级冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注册读就绪事件和写就绪事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
public void receive(SelectionKey key) throws IOException {
//获得与SelectionKey关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//创建一个ByteBuffer用于存放读到的数据
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的极限设为容量
buffer.limit(buffer.capacity());
//把readBuff中的内容拷贝到buffer
buffer.put(readBuff);
}
public void send(SelectionKey key) throws IOException {
//获得与SelectionKey关联的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK编码把buffer中的字节转换为字符串
String data = decode(buffer);
//如果还没有读到一行数据就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行数据
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//输出outputBuffer的所有字节
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置设为temp的极限
buffer.position(temp.limit()):
//删除buffer已经处理的数据
buffer.compact();
//如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
完整代码如下:
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
// 创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1层while循环
while(selector.select() > 0) {
//获得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2层while循环
while (it.hasNext()) {
SelectionKey key = null;
//处理SelectionKey
try {
//取出一个SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey从Selector 的selected-key 集合中删除
it.remove();
1f (key.isAcceptable()) {
//获得与SelectionKey关联的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//获得与客户连接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel设置为非阻塞模式
socketChannel.configureBlocking(false);
//创建一个用于存放用户发送来的数据的级冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注册读就绪事件和写就绪事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if (key.isReadable()) { receive(key); }
if (key.isWritable()) { send(key); }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使这个SelectionKey失效
key.cancel();
//关闭与这个SelectionKey关联的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
public void receive(SelectionKey key) throws IOException {
//获得与SelectionKey关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//创建一个ByteBuffer用于存放读到的数据
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的极限设为容量
buffer.limit(buffer.capacity());
//把readBuff中的内容拷贝到buffer
buffer.put(readBuff);
}
public void send(SelectionKey key) throws IOException {
//获得与SelectionKey关联的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//获得与SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK编码把buffer中的字节转换为字符串
String data = decode(buffer);
//如果还没有读到一行数据就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行数据
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//输出outputBuffer的所有字节
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置设为temp的极限
buffer.position(temp.limit()):
//删除buffer已经处理的数据
buffer.compact();
//如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
//解码
public String decode(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toStrinq();
}
//编码
public ByteBuffer encode(String str) {
return charset.encode(str);
}
public static void main(String args[])throws Exception {
EchoServer server = new EchoServer();
server.service();
}
}
使用非阻塞模式时,ServerSocketChannel
以及 SocketChannel
都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer
采用一个线程同时完成这些操作
假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector
注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void accept() {
while(true) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized(gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
} catch(IOException e) {
e.printStackTrace();
}
}
}
private Object gate=new Object();
public void service() throws IOException {
while(true) {
synchronized(gate){}
int n = selector.select();
if(n == 0) continue;
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
while (it.hasNext()) {
SelectionKey key = null;
try {
it.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
key.cancel();
key.channel().close();
}
} catch(Exception ex) { e.printStackTrace(); }
}
}
}
}
public void receive(SelectionKey key) throws IOException {
...
}
public void send(SelectionKey key) throws IOException {
...
}
public String decode(ByteBuffer buffer) {
...
}
public ByteBuffer encode(String str) {
...
}
public static void main(String args[])throws Exception {
final EchoServer server = new EchoServer();
Thread accept = new Thread() {
public void run() {
server.accept();
}
};
accept.start();
server.service();
}
}
注意一点:主线程的 selector select()
方法和 Accept 线程的 register(...)
方法都会造成阻塞,因为他们都会操作 Selector
对象的共享资源 all-keys
集合,这有可能会导致死锁
导致死锁的具体情形是:Selector
中尚没有任何注册的事件,即 all-keys
集合为空,主线程执行 selector.select()
方法时将进入阻塞状态,只有当 Accept 线程向 Selector
注册了事件,并且该事件发生后,主线程才会从 selector.select()
方法返回。然而,由于主线程正在 selector.select()
方法中阻塞,这使得 Acccept
线程也在 register()
方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去
为了避免对共享资源的竞争,同步机制使得一个线程执行 register()
时,不允许另一个线程同时执行 select()
方法,反之亦然
以上就是Java 网络编程 —— 实现非阻塞式的服务器的详细内容,更多关于Java 网络编程 —— 实现非阻塞式的服务器的资料请关注九品源码其它相关文章!