import java.util.Collection;
import java.util.Vector;
/**
* 任务分发器
*/
public class TaskManage extends Thread
{
protected Vector<Runnable> tasks = new Vector<Runnable>();
protected boolean running = false;
protected boolean stopped = false;
protected boolean paused = false;
protected boolean killed = false;
private ThreadPool pool;
public TaskManage(ThreadPool pool)
{
this.pool = pool;
}
public void putTask(Runnable task)
{
tasks.add(task);
}
public void putTasks(Runnable[] tasks)
{
for (int i = 0; i < tasks.length; i++)
this.tasks.add(tasks[i]);
}
public void putTasks(Collection<Runnable> tasks)
{
this.tasks.addAll(tasks);
}
protected Runnable popTask()
{
if (tasks.size() > 0) return (Runnable) tasks.remove(0);
else return null;
}
public boolean isRunning()
{
return running;
}
public void stopTasks()
{
stopped = true;
}
public void stopTasksSync()
{
stopTasks();
while (isRunning())
{
try
{
sleep(5);
}
catch (InterruptedException e)
{
TaskException.getResultMessage(e);
}
}
}
public void pauseTasks()
{
paused = true;
}
public void pauseTasksSync()
{
pauseTasks();
while (isRunning())
{
try
{
sleep(5);
}
catch (InterruptedException e)
{
TaskException.getResultMessage(e);
}
}
}
public void kill()
{
if (!running) interrupt();
else killed = true;
}
public void killSync()
{
kill();
while (isAlive())
{
try
{
sleep(5);
}
catch (InterruptedException e)
{
TaskException.getResultMessage(e);
}
}
}
public synchronized void startTasks()
{
running = true;
this.notify();
}
public synchronized void run()
{
try
{
while (true)
{
if (!running || tasks.size() == 0)
{
pool.notifyForIdleThread();
this.wait();
}
else
{
Runnable task;
while ((task = popTask()) != null)
{
task.run();
if (stopped)
{
stopped = false;
if (tasks.size() > 0)
{
tasks.clear();
System.out.println(Thread.currentThread().getId() + ": Tasks are stopped");
break;
}
}
if (paused)
{
paused = false;
if (tasks.size() > 0)
{
System.out.println(Thread.currentThread().getId() + ": Tasks are paused");
break;
}
}
}
running = false;
}
if (killed)
{
killed = false;
break;
}
}
}
catch (InterruptedException e)
{
TaskException.getResultMessage(e);
return;
}
}
}
import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;
/**
* 线程池
*/
public class ThreadPool
{
protected int maxPoolSize = TaskConfig.maxPoolSize;
protected int initPoolSize = TaskConfig.initPoolSize;
protected Vector<TaskManage> threads = new Vector<TaskManage>();
protected boolean initialized = false;
protected boolean hasIdleThread = false;
public ThreadPool()
{
super();
}
public ThreadPool(int maxPoolSize, int initPoolSize)
{
this.maxPoolSize = maxPoolSize;
this.initPoolSize = initPoolSize;
}
public void init()
{
initialized = true;
for (int i = 0; i < initPoolSize; i++)
{
TaskManage thread = new TaskManage(this);
thread.start();
threads.add(thread);
}
}
public void setMaxPoolSize(int maxPoolSize)
{
this.maxPoolSize = maxPoolSize;
if (maxPoolSize < getPoolSize()) setPoolSize(maxPoolSize);
}
/**
* 重设当前线程数 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事
* 务处理完成 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
*/
public void setPoolSize(int size)
{
if (!initialized)
{
initPoolSize = size;
return;
}
else if (size > getPoolSize())
{
for (int i = getPoolSize(); i < size && i < maxPoolSize; i++)
{
TaskManage thread = new TaskManage(this);
thread.start();
threads.add(thread);
}
}
else if (size < getPoolSize())
{
while (getPoolSize() > size)
{
TaskManage th = (TaskManage) threads.remove(0);
th.kill();
}
}
}
public int getPoolSize()
{
return threads.size();
}
protected void notifyForIdleThread()
{
hasIdleThread = true;
}
protected boolean waitForIdleThread()
{
hasIdleThread = false;
while (!hasIdleThread && getPoolSize() >= maxPoolSize)
{
try
{
Thread.sleep(5);
}
catch (InterruptedException e)
{
TaskException.getResultMessage(e);
return false;
}
}
return true;
}
public synchronized TaskManage getIdleThread()
{
while (true)
{
for (Iterator<TaskManage> itr = threads.iterator(); itr.hasNext();)
{
TaskManage th = (TaskManage) itr.next();
if (!th.isRunning()) return th;
}
if (getPoolSize() < maxPoolSize)
{
TaskManage thread = new TaskManage(this);
thread.start();
threads.add(thread);
return thread;
}
if (waitForIdleThread() == false) return null;
}
}
public void processTask(Runnable task)
{
TaskManage th = getIdleThread();
if (th != null)
{
th.putTask(task);
th.startTasks();
}
}
public void processTasksInSingleThread(Runnable[] tasks)
{
TaskManage th = getIdleThread();
if (th != null)
{
th.putTasks(tasks);
th.startTasks();
}
}
public void processTasksInSingleThread(Collection<Runnable> tasks)
{
TaskManage th = getIdleThread();
if (th != null)
{
th.putTasks(tasks);
th.startTasks();
}
}
}
public class TopTask implements Runnable
{
private ThreadPool pool;
public TopTask()
{
super();
}
public TopTask(ThreadPool pool)
{
super();
this.pool = pool;
}
@Override
public void run()
{
init();
start();
}
/**
* 初始化验证权限、参数之类
*/
public void init()
{
}
/**
* 开始自动任务
*/
public void start()
{
for (int i = 0; i < 10; i++)
{
pool.processTask(new BeginAuto());
}
}
}
/**
* 实现类
*/
class BeginAuto implements Runnable
{
@Override
public void run()
{
System.out.println(Thread.currentThread().getId() + "..................");
}
}
新闻热点
疑难解答