serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);
服务新连接
当有新连接建立时,accept返回时,将服务任务提交给线程池执行。
while(true){
Socket socket = serverListenSocket.accept();
pool.execute(new ServiceThread(socket));
}
这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。
服务任务
服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:
private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
int ret = 0;
try{
lock.lock();
ret = count;
}finally{
lock.unlock();
}
return ret;
}
private void increaseCount(){
try{
lock.lock();
++count;
}finally{
lock.unlock();
}
}
服务线程在开始给客户端打印一个欢迎信息,
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+" ";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
服务器端的完整实现
服务器端的完整实现代码如下:
package com.andrew;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Server {
private static int produceTaskSleepTime = 100;
private static int consumeTaskSleepTime = 1200;
private static int produceTaskMaxNumber = 100;
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 100;
private static final int KEEPALIVE_TIME = 3;
private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final String HOST = "127.0.0.1";
private static final int PORT = 19527;
private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
//private ThreadPoolExecutor serverThreadPool = null;
private ExecutorService pool = null;
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
private ServerSocket serverListenSocket = null;
private int times = 5;
public void start() {
// You can also init thread pool in this way.
/*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
rejectedExecutionHandler);*/
pool = Executors.newFixedThreadPool(10);
try {
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
System.out.println("I'm listening");
while (times-- 0) {
Socket socket = serverListenSocket.accept();
String welcomeString = "hello";
//serverThreadPool.execute(new ServiceThread(socket, welcomeString));
pool.execute(new ServiceThread(socket));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cleanup();
}
public void cleanup() {
if (null != serverListenSocket) {
try {
serverListenSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//serverThreadPool.shutdown();
pool.shutdown();
}
public static void main(String args[]) {
Server server = new Server();
server.start();
}
}
class ServiceThread implements Runnable, Serializable {
private static final long serialVersionUID = 0;
private Socket connectedSocket = null;
private String helloString = null;
private static int count = 0;
private static ReentrantLock lock = new ReentrantLock();
ServiceThread(Socket socket) {
connectedSocket = socket;
}
public void run() {
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount + "";
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new TimeConsumingTask());
DataOutputStream dos = null;
try {
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
try {
dos.write("let's do soemthing other.".getBytes());
String result = future.get();
dos.write(result.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (null != connectedSocket) {
try {
connectedSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != dos) {
try {
dos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
executor.shutdown();
}
}
private int getCount() {
int ret = 0;
try {
lock.lock();
ret = count;
} finally {
lock.unlock();
}
return ret;
}
private void increaseCount() {
try {
lock.lock();
++count;
} finally {
lock.unlock();
}
}
}
class TimeConsumingTask implements Callable {
public String call() throws Exception {
System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
return "ok, here's the result: It takes me lots of time to produce this result";
}
}