Skip to content

Commit

Permalink
V1.1.0
Browse files Browse the repository at this point in the history
add monitoring function
  • Loading branch information
JonZhang3 committed Nov 19, 2020
1 parent 1b69b13 commit 4cdd282
Show file tree
Hide file tree
Showing 16 changed files with 750 additions and 29 deletions.
67 changes: 62 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
### 主要特性
1. 操作简便,采用链式调用
2. 多种任务类型:可设置回调的任务,可获取返回结果的任务,任务组
3. 可自定义任务的类型,ID,超时时间等参数
3. 可自定义任务的类型,超时时间等参数
4. 灵活,可阻塞当前线程,亦可设置回调方法
5. 实时获取正在运行任务状态,对已完成任务进行自定义处理
6. 可实时监控任务执行情况

### 使用说明
#### 引入
Expand All @@ -20,7 +21,7 @@ Maven
<dependency>
<groupId>com.github.jonzhang3</groupId>
<artifactId>aTask</artifactId>
<version>1.0.2</version>
<version>1.1.0</version>
</dependency>
```
#### 示例
Expand All @@ -40,7 +41,7 @@ TaskEngine engine = new TaskEngine.Builder()
// 可通过 getRunningTasks() 方法获取当前正在执行的任务
```

##### 1. 回调处理任务执行结果的任务的使用
##### 1. 使用回调处理执行结果的任务的使用
```java
Task task = engine.buildTask(ctx -> {
ctx.onProgress(100);// 设置进度值,将调用进度回调函数
Expand All @@ -49,7 +50,6 @@ Task task = engine.buildTask(ctx -> {
// 如果 onSuccess 和 onError 都调用了,则第首先执行的方法将会调用成功
})
.type("type")// 设置任务的类型
.id("id")// 设置任务的 ID
.progress(progress -> {})// 设置任务的进度回调
// 设置任务的结果回调
// 如果任务执行失败,error 则不为 null;如果任务执行成功,error 则为 null
Expand All @@ -72,7 +72,6 @@ ResultTask<String> resultTask = engine.buildResultTask(ctx -> {
return "success";// 返回结果数据
})
.type("type")
.id("id")
.progress(i -> {})
.build();
engine.go(resultTask);
Expand All @@ -93,3 +92,61 @@ group.await();// 等待线程组中所有的任务执行完成
group.getCounter();// 获取计数器的结果
Data data = group.getData();// 获取组中任务执行时设置的数据(线程安全)
```
##### 4. 监控页面的使用

##### 第一种方式:

注册一个 ServletRegistrationBean 类
```java
@Bean
public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
TaskEngine engine = new TaskEngine.Builder()
.build();
engine.prepareGroup("test");
// 设置 TaskEngine
TaskStatService.setTaskEngine(engine);
// 注册 StatViewServlet,并为其设置路径
ServletRegistrationBean<StatViewServlet> registrationBean =
new ServletRegistrationBean<>(new StatViewServlet(), "/atask/*");
registrationBean.addInitParameter("username", "root");// 设置登录用户名
registrationBean.addInitParameter("password", "1234");// 设置登录密码
return registrationBean;
}
```

##### 第二种方式:

首先新建一个 `TaskEngine`, 然后将其设置到 `TaskStatService`
```java
// 也可以采用其他方式
static TaskEngine engine;
static {
engine = new TaskEngine.Builder()
.build();
TaskStatService.setTaskEngine(engine);
}
```
然后新建一个 `Servlet` 类,并添加 `@WebServlet` 注解
```java
@WebServlet(
urlPatterns = "/atask/*",
initParams = {
@WebInitParam(name = "username", value = "admin"),
@WebInitParam(name = "password", value = "123456"),
}
)
public class ATaskStatServlet extends StatViewServlet {

}
```
不要忘了在启动类上添加 `@ServletComponentScan` 注解
```java
@SpringBootApplication
@ServletComponentScan
public class DemoApplication {
}
```

监控页面:

![监控页面](https://jonzhang-3.gitee.io/pics/atask/monitor.jpg)
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.jonzhang3</groupId>
<artifactId>aTask</artifactId>
<version>1.0.2</version>
<version>1.1.0</version>

<name>ATask</name>
<description>A simple multi-purpose asynchronous task execution framework.</description>
Expand Down Expand Up @@ -50,6 +50,13 @@
<artifactId>tuples</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/atask/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.atask.util.Assert;
import com.atask.util.Utils;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,7 +32,7 @@ protected AbstractTask(String type, String id) {
this.type = type;
}
if (Utils.isEmpty(id)) {
this.id = UUID.randomUUID().toString();
this.id = Utils.generateId();
} else {
this.id = id;
}
Expand Down
62 changes: 53 additions & 9 deletions src/main/java/com/atask/DefaultThreadPoolExecutor.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.atask;

import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -13,10 +16,16 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

class DefaultThreadPoolExecutor extends ThreadPoolExecutor {
final class DefaultThreadPoolExecutor extends ThreadPoolExecutor {

// 统计运行的任务总数量
private final AtomicLong taskNumber = new AtomicLong(0);
// 统计完成的任务总数量
private final AtomicLong completedTaskNumber = new AtomicLong(0);
private final Deque<Task> runningQueue = new ConcurrentLinkedDeque<>();
private final Map<String, TaskGroup> runningTaskGrous = new ConcurrentHashMap<>();
private final LinkedBlockingDeque<Task> completedQueue = new LinkedBlockingDeque<>();

private final CompletedTaskHandler completedTaskHandler;
Expand All @@ -37,14 +46,14 @@ public void submit(Task task) {
if (task instanceof BaseTask) {
BaseTask bTask = (BaseTask) task;
TaskExecutor taskExecutor = new TaskExecutor(bTask);
RunnableFuture<Object> future = newTaskFor(task, taskExecutor, null);
RunnableFuture<Object> future = newTaskFor(task, taskExecutor);
bTask.setFuture(future);
bTask.setState(State.INIT, State.QUEUED);
execute(future);
} else if (task instanceof ResultBaseTask) {
ResultBaseTask bTask = (ResultBaseTask) task;
ResultTaskExecutor executor = new ResultTaskExecutor(bTask);
RunnableFuture future = newTaskFor(task, executor);
ResultBaseTask<?> bTask = (ResultBaseTask<?>) task;
ResultTaskExecutor<?> executor = new ResultTaskExecutor<>(bTask);
RunnableFuture<?> future = newTaskFor(task, executor);
bTask.setFuture(future);
bTask.setState(State.INIT, State.QUEUED);
execute(future);
Expand All @@ -54,7 +63,7 @@ public void submit(Task task) {

public void submit(TaskGroup.Item item, TaskGroup group) {
TaskGroup.ItemExecutor executor = new TaskGroup.ItemExecutor(item, group);
RunnableFuture<Object> future = newTaskFor(item, executor, null);
RunnableFuture<Object> future = newTaskFor(item, executor);
item.setFuture(future);
item.setState(State.INIT, State.QUEUED);
execute(future);
Expand All @@ -67,8 +76,13 @@ private <T> RunnableFuture<T> newTaskFor(Task task, Callable<T> callable) {
}


private <T> RunnableFuture<T> newTaskFor(Task task, Runnable runnable, T value) {
return new CustomFutureTask<>(task, runnable, value);
private <T> RunnableFuture<T> newTaskFor(Task task, Runnable runnable) {
return new CustomFutureTask<>(task, runnable, null);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
taskNumber.incrementAndGet();
}

@Override
Expand All @@ -78,6 +92,7 @@ protected void afterExecute(Runnable r, Throwable t) {
Task task = futureTask.getTask();
runningQueue.remove(task);
completedQueue.offer(task);
completedTaskNumber.incrementAndGet();
}
}

Expand All @@ -93,10 +108,39 @@ private void startHandleCompletedTask() {
}).start();
}

public final List<Task> getRunningTasks() {
// 获取正在运行的任务,包含任务组中的任务
protected final List<Task> getRunningTasks() {
return new LinkedList<>(runningQueue);
}

// 获取正在运行的任务总量,包含任务组中的任务
protected int getRunningNumberofTask() {
return runningQueue.size();
}

// 获取已经完成的任务总量,包含任务组中的任务
protected long getCompletedNumberOfTask() {
return completedTaskNumber.get();
}

// 获取执行的任务总量
protected long getTotalNumberOfTask() {
return taskNumber.get();
}

protected void addTaskGroup(TaskGroup taskGroup) {
this.runningTaskGrous.put(taskGroup.getId(), taskGroup);
}

protected void removeTaskGroup(TaskGroup taskGroup) {
this.runningTaskGrous.remove(taskGroup.getId());
}

// 获取正在运行的任务组
protected Collection<TaskGroup> getRunningTaskGroups() {
return new LinkedList<>(this.runningTaskGrous.values());
}

private static class CustomFutureTask<V> extends FutureTask<V> {

private final Task task;
Expand Down
80 changes: 80 additions & 0 deletions src/main/java/com/atask/Json.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.atask;

final class Json {

protected static JsonObject createObject() {
return new JsonObject();
}

protected static JsonArray createArray() {
return new JsonArray();
}

static class JsonObject {
private final StringBuilder builder = new StringBuilder();

private JsonObject() {
builder.append("{");
}

public JsonObject put(String name, Object value) {
builder.append("\"").append(name).append("\":");
if (value == null) {
builder.append("null");
} else if (value instanceof CharSequence) {
builder.append("\"").append(value.toString()).append("\"");
} else {
builder.append(value.toString());
}
builder.append(",");
return this;
}

public JsonObject end() {
if(builder.charAt(builder.length() - 1) == ',') {
builder.deleteCharAt(builder.length() - 1);
}
builder.append("}");
return this;
}

@Override
public String toString() {
return builder.toString();
}
}

static class JsonArray {
private final StringBuilder builder = new StringBuilder();

private JsonArray() {
builder.append("[");
}

public JsonArray add(Object value) {
if (value == null) {
builder.append("null");
} else if (value instanceof CharSequence) {
builder.append("\"").append(value).append("\"");
} else {
builder.append(value.toString());
}
builder.append(",");
return this;
}

public JsonArray end() {
if(builder.charAt(builder.length() - 1) == ',') {
builder.deleteCharAt(builder.length() - 1);
}
builder.append("]");
return this;
}

@Override
public String toString() {
return builder.toString();
}
}

}
Loading

0 comments on commit 4cdd282

Please # to comment.