手撸XXL-JOB(四)——远程调用定时任务

Java Socket网络编程

网络编程是Java编程中的重要组成部分,包括服务端和客户端两部分内容。Socket是Java网络编程的基本组件之一,用于在应用程序之间提供双向通信,Socket提供了一种标准的接口,允许应用程序通过网络发送和接收数据,在Java中,Socket可以分为客户端Socket和服务端Socket两种类型。 客户端Socket:客户端 Socket 用于与服务端 Socket 进行通信。客户端 Socket 通过指定服务端的 IP 地址和端口号,连接到服务端 Socket,然后发送数据到服务端 Socket。 服务端Socket:服务端 Socket 用于接收来自客户端 Socket 的连接请求,并在连接成功后,与客户端 Socket 进行通信。服务端 Socket 首先需要创建一个 ServerSocket 对象,并通过 bind 方法绑定到一个本地端口,然后等待客户端 Socket 的连接请求。 下面是Socket的一个示例: 服务端:

package org.example.demo1;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
public static void main(String args) {
try {
ServerSocket serverSocket = new ServerSocket(8000);
System.out.println(“Server started, waiting for client…”);

        Socket socket = serverSocket.accept();
        System.out.println("Client connected.");

        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

        String message;
        while ((message = in.readLine()) != null) {
            System.out.println("Client:" + message);
            out.println("Server received message:" + message);
        }

        in.close();
        out.close();
        socket.close();
        serverSocket.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

客户端:

package org.example.demo1;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {
public static void main(String args) {
try {
Socket socket = new Socket(“localhost”, 8000);
System.out.println(“Connected to server.”);

        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

        BufferedReader consoleIn = new BufferedReader(new InputStreamReader(System.in));
        String message;
        while ((message = consoleIn.readLine()) != null) {
            out.println(message);
            System.out.println("Server:" + in.readLine());
        }
        consoleIn.close();
        in.close();
        out.close();
        socket.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

首先启动服务端,然后启动客户端,在客户端的控制台,输入数据,服务端能接收到数据并返回对应的响应。

远程调用定时任务

首先,我们创建两个模块,core模块包含yang-job的一些核心内容,比如IJobExecutor执行器、JobExecuteRequest执行器请求等;client模块依赖core模块,并封装和socket客户端调用相关的一些内容。 然后创建一个sample1模块,用于演示。

image.png

core模块

image.png core目前定义了定时任务执行类和其入参、出参等信息,其中,YangJobTransferDTO包含任务类路径和任务请求,如下所示:

package com.yang.job.dto;

import com.yang.job.execute.YangJobExecuteRequest;

import java.io.Serializable;

public class YangJobTransferDTO implements Serializable {
private String className;

private YangJobExecuteRequest yangJobExecuteRequest;

public String getClassName() {
    return className;
}

public void setClassName(String className) {
    this.className = className;
}

public YangJobExecuteRequest getYangJobExecuteRequest() {
    return yangJobExecuteRequest;
}

public void setYangJobExecuteRequest(YangJobExecuteRequest yangJobExecuteRequest) {
    this.yangJobExecuteRequest = yangJobExecuteRequest;
}

}

client模块

client模块定义了客户端所需要的一些类,其中,YangJob为注解类,对于每一个定时任务,需要加上YangJob注解,才能被正确调用。

package com.yang.job.client.annotations;

import java.lang.annotation.*;

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface YangJob {
}

YangJobClientProperty为配置信息类,目前需要两个配置信息,客户端socket的ip和端口号

package com.yang.job.client.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class YangJobClientProperty {
@Value(“${yang-job.executor.port}”)
private Integer port;

@Value("${yang-job.executor.ip}")
private String ip;


public Integer getPort() {
    return port;
}

public void setPort(Integer port) {
    this.port = port;
}

public String getIp() {
    return ip;
}

public void setIp(String ip) {
    this.ip = ip;
}

}

YangJobClientPostProcessor在SpringBoot加载完毕后,扫描bean,将实现IYongJobExecutor的bean,注册到YangJobClientManager的map中,方便后续调用

package com.yang.job.client.schema;

import com.yang.job.client.annotations.YangJob;
import com.yang.job.client.YangJobClientManager;
import com.yang.job.execute.IYangJobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

public class YangJobClientPostProcessor implements BeanPostProcessor {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof IYangJobExecutor)) {
return bean;
}
YangJob annotation = bean.getClass().getAnnotation(YangJob.class);
if (annotation == null) {
return bean;
}
YangJobClientManager.putJobExecutor(bean.getClass().getName(), (IYangJobExecutor) bean);
return bean;
}
}

YangJobClientManager负责监听端口和管理定时任务的执行,它会监听我们配置的yang-job.execute.port端口号,然后当接收到消息时,将消息转为入参,并取出对应的定时任务执行类,执行对应的代码。

package com.yang.job.client;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.dto.ResultT;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class YangJobClientManager {
private static Map<string, iyangjobexecutor=“”> className2JobExecutorMap = new ConcurrentHashMap<>();

private YangJobClientPropertyDTO yangJobClientPropertyDTO;

private ServerSocket serverSocket;

public YangJobClientManager(YangJobClientPropertyDTO yangJobClientPropertyDTO) {
    this.yangJobClientPropertyDTO = yangJobClientPropertyDTO;
}

public void init() {
    Integer port = this.yangJobClientPropertyDTO.getPort();
    try {
        this.serverSocket = new ServerSocket(port);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    System.out.println("init success============");
    new Thread(() -&gt; {
        while (true) {
            try {
                Socket socket = serverSocket.accept();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
                String params = bufferedReader.readLine();
                YangJobTransferDTO yangJobTransferDTO = JSONObject.parseObject(params, YangJobTransferDTO.class);
                System.out.println(yangJobTransferDTO);
                String className = yangJobTransferDTO.getClassName();
                YangJobExecuteRequest yangJobExecuteRequest = yangJobTransferDTO.getYangJobExecuteRequest();
                IYangJobExecutor jobExecutor = getJobExecutor(className);
                if (jobExecutor != null) {
                    ResultT response = jobExecutor.execute(yangJobExecuteRequest);
                    printWriter.println(JSONObject.toJSONString(response));
                }
                bufferedReader.close();
                printWriter.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (serverSocket.isClosed() || serverSocket == null) {
                break;
            }
        }
    }).start();
}

public void shutdown() {
    if (this.serverSocket != null) {
        try {
            this.serverSocket.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

public YangJobClientPropertyDTO getYangJobPropertyDTO() {
    return this.yangJobClientPropertyDTO;
}

public static void putJobExecutor(String className, IYangJobExecutor iJobExecutor) {
    className2JobExecutorMap.put(className, iJobExecutor);
}

public static IYangJobExecutor getJobExecutor(String className) {
    return className2JobExecutorMap.get(className);
}

}
</string,>

YangJobClientContext为客户端的上下文,负责监听SpringBoot刷新消息和关闭消息,并执行对应的操作。

package com.yang.job.client;

import com.yang.job.client.utils.SpringContextUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;

public class YangJobClientContext implements ApplicationListener {
private static YangJobClientContext instance;

private ApplicationContext applicationContext;

@Override
public void onApplicationEvent(ApplicationContextEvent event) {
    if (event instanceof ContextRefreshedEvent) {
        System.out.println("刷新了=========");
        YangJobClientContext.instance = this;
        instance.applicationContext = applicationContext;
        init();
    } else if (event instanceof ContextClosedEvent) {
        System.out.println("销毁了=========");
        shutdown();
    }
}

private void init() {
    YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);
    yangJobClientManager.init();
}

private void shutdown() {
    YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);
    yangJobClientManager.shutdown();
}

}

YangJobClientConfiguration为配置类,负责对YangJobClientPostProcessor、YangJobClientManager和YangJobClientContext的统一配置管理。

package com.yang.job.client.configuration;

import com.yang.job.client.YangJobClientManager;
import com.yang.job.client.YangJobClientContext;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.client.schema.YangJobClientPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class YangJobClientConfiguration {
@Autowired
private YangJobClientProperty yangJobClientProperty;

@Bean
public YangJobClientPostProcessor yangJobPostProcessor() {
    return new YangJobClientPostProcessor();
}

@Bean
public YangJobClientManager yangJobClientManager() {
    YangJobClientPropertyDTO yangJobClientPropertyDTO = new YangJobClientPropertyDTO();
    yangJobClientPropertyDTO.setIp(yangJobClientProperty.getIp());
    yangJobClientPropertyDTO.setPort(yangJobClientProperty.getPort());
    return new YangJobClientManager(yangJobClientPropertyDTO);
}

@Bean
public YangJobClientContext yangJobContext() {
    return new YangJobClientContext();
}

}

最后,为了使引入client依赖的应用,能自动装配我们提供的bean,我们在resources目录下创建META-INF目录,在该目录下创建spring.factories文件,文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.yang.job.client.utils.SpringContextUtils,\
  com.yang.job.client.configuration.YangJobClientProperty,\
  com.yang.job.client.configuration.YangJobClientConfiguration

sample1

我们创建一个sample1项目,引入spring-boot-starter-web依赖和yang-client,yang-core的依赖

  
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            com.yang
            yang-job-core
            1.0-SNAPSHOT
        
        
            com.yang
            yang-job-client
            1.0-SNAPSHOT
        
    

创建启动类

package com.yang.job.sample1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class YangJobSample1App {
public static void main(String args) {
SpringApplication.run(YangJobSample1App.class, args);
}
}

创建一个任务类:

package com.yang.job.sample1.task;

import com.yang.job.client.annotations.YangJob;
import com.yang.job.dto.ResultT;
import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;

@Component
@YangJob
public class TestTask1 implements IYangJobExecutor {
@Override
public ResultT execute(YangJobExecuteRequest yangJobExecuteRequest) {
System.out.println(“开启定时任务了,入参为:” + yangJobExecuteRequest);
return ResultT.success();
}
}

添加配置文件,因为client模块的YangJobClientProperty需要有yang-job.executor.port和yang-job.executor.ip这两个配置,如果我们的配置文件中,缺少这些配置,会导致报错,无法启动项目。

spring:
  application:
    name: YangJobSample1App
yang-job:
  executor:
    port: 9999
    ip: 127.0.0.1
server:
  port: 8001

测试

我们先启动刚才的sample1项目,然后执行下列代码,来远程调用TestTask1方法执行类。

 public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1", 9999);
            System.out.println("链接成功=============");
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
            yangJobExecuteRequest.setJobId("1");
            yangJobExecuteRequest.addParam("num", "1");
            YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();
            yangJobTransferDTO.setClassName("com.yang.job.sample1.task.TestTask1");
            yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);
        printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));
        System.out.println("response:" + bufferedReader.readLine());
        bufferedReader.close();
        printWriter.close();
        socket.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

执行结果如下,说明我们能成功地进行远程调用。

添加远程任务

domain层

在上一篇文章中,我们操作的任务,都是本地任务,现在我们需要对远程任务进行操作,为了区分任务类型,我们首先在domain层添加一个任务类型枚举

package com.yang.job.admin.domain.enums;

public enum JobTypeEnum {
LOCAL(“local”, “本地任务”),
REMOTE(“remote”, “远程任务”);

private String name;

private String description;

JobTypeEnum(String name, String description) {
    this.name = name;
    this.description = description;
}

public String getName() {
    return name;
}

public String getDescription() {
    return description;
}

public static JobTypeEnum getJobTypeByName(String name) {
    for (JobTypeEnum value : values()) {
        if (value.getName().equals(name)) {
            return value;
        }
    }
    return null;
}

}

然后修改YangJobModel,添加上任务类型枚举和远程任务信息

package com.yang.job.admin.domain.model;

import com.yang.job.admin.client.dto.common.BusinessException;
import com.yang.job.admin.client.dto.common.ErrorCode;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.event.SaveJobPostEvent;
import com.yang.job.admin.domain.event.SubmitJobPostEvent;
import com.yang.job.admin.domain.event.UpdateJobPostEvent;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.event.EventCenter;
import com.yang.job.admin.infra.job.YangJobManager;
import com.yang.job.admin.infra.job.request.YangJobSubmitParam;
import com.yang.job.admin.infra.utils.CronUtils;
import com.yang.job.admin.infra.utils.SpringContextUtils;
import lombok.Data;

import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobModel implements Serializable {
private Integer jobId;

private String jobName;

private String description;

private String cron;

private String executeClassPath;

private Runnable runnable;

private JobExecuteStrategyEnum executeStrategy;

private JobTypeEnum jobType;

private RemoteExecutorMessage remoteExecutorMessage;

private Integer enable;

private Integer open;

private Date createTime;

private Date updateTime;

private Map<string, string=""> featureMap = new HashMap&lt;&gt;();

private Map<string, string=""> executeParamMap = new HashMap&lt;&gt;();

public boolean isEnable() {
    if (this.enable == null) {
        return false;
    }
    return this.enable == 1;
}

public boolean isOpen() {
    if (!isEnable()) {
        return false;
    }
    if (this.open == null) {
        return false;
    }
    return this.open == 1;
}

public boolean isClose() {
    return !isOpen();
}

public boolean isLocalJob() {
    return JobTypeEnum.LOCAL == this.jobType;
}

public boolean isRemoteJob() {
    return JobTypeEnum.REMOTE == this.jobType;
}

public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) {
    if (jobExecuteStrategyEnum == null) {
        throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST);
    }
    this.executeStrategy = jobExecuteStrategyEnum;
}


public void submitJob() {
    YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam();
    YangJobManager yangJobManager = getYangJobManager();
    yangJobManager.submitJob(yangJobSubmitParam);
    // 提交任务后,发送提交任务后置事件
    SubmitJobPostEvent submitJobPostEvent = new SubmitJobPostEvent(yangJobSubmitParam);
    getEventCenter().postEvent(submitJobPostEvent);
}

public void cancelJob() {
    YangJobManager yangJobManager = getYangJobManager();
    yangJobManager.cancelJob(this.jobId);
}

private YangJobSubmitParam convert2YangJobSubmitParam() {
    YangJobSubmitParam yangJobBuildParam = new YangJobSubmitParam();
    yangJobBuildParam.setJobId(this.jobId);
    yangJobBuildParam.setRunnable(this.runnable);
    ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(this.cron, ZonedDateTime.now());
    ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(this.cron, nextExecutionTime);
    long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();
    long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();
    long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();
    yangJobBuildParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);
    yangJobBuildParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);
    yangJobBuildParam.setJobExecuteStrategy(this.executeStrategy);
    return yangJobBuildParam;
}

public void postSaveJobEvent() {
    SaveJobPostEvent saveJobPostEvent = new SaveJobPostEvent(this.jobId);
    getEventCenter().asyncPostEvent(saveJobPostEvent);
}

public void postUpdateJobEvent() {
    UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);
    getEventCenter().asyncPostEvent(updateJobPostEvent);
}

public void postDeleteJobEvent() {
    UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);
    getEventCenter().asyncPostEvent(updateJobPostEvent);
}

private YangJobManager getYangJobManager() {
    return SpringContextUtils.getBeanOfType(YangJobManager.class);
}

private EventCenter getEventCenter() {
    return SpringContextUtils.getBeanOfType(EventCenter.class);
}

}
</string,></string,>

远程任务信息类:

package com.yang.job.admin.domain.valueobject;

import lombok.Data;

import java.io.Serializable;

@Data
public class RemoteExecutorMessage implements Serializable {
private String ip;

private Integer port;

}

接着我们添加一个features枚举,用于记录映射features字段中各个key表示的含义,因为我们现在表的设计中没有任务类型字段和远程信息相关的字段,所以会将这些信息添加到features字段中

package com.yang.job.admin.domain.enums;

public enum JobModelFeatureEnum {
JOB_TYPE(“jobType”, “任务类型”),
REMOTE_EXECUTOR_IP(“executorIp”, “执行器ip”),
REMOTE_EXECUTOR_PORT(“executorPort”, “执行器端口”),
REMOTE_EXECUTOR_MESSAGE(“r_executor_m”, “远程执行器的信息”);

private String name;

private String description;

JobModelFeatureEnum(String name, String description) {
    this.name = name;
    this.description = description;
}


public String getName() {
    return name;
}

public String getDescription() {
    return description;
}

}

client层

我们修改原先的NewYangJobCommand类,加上任务类型属性

package com.yang.job.admin.client.dto.command;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class NewYangJobCommand implements Serializable {
private String jobName;

private String description;

private String cron;

private String executeStrategy;

private String jobType;

private String executeClassPath;

private Integer open;

private Map<string, string=""> params = new HashMap&lt;&gt;();

}
</string,>

然后修改YangJobDTO类,也加上jobType属性

package com.yang.job.admin.client.dto;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobDTO implements Serializable {
private Integer jobId;

private String jobName;

private String description;

private String cron;

private String executeStrategy;

private String executeClassPath;

private String jobType;

private Integer enable;

private Integer open;

private Date createTime;

private Date updateTime;

private Map<string, string=""> featureMap = new HashMap&lt;&gt;();

private Map<string, string=""> executeParamMap = new HashMap&lt;&gt;();

}
</string,></string,>

application层

接着修改YangJobApplicationService类的convertYangJobModel方法,将jobType任务类型和远程任务信息添加到YangJobModel中

 private YangJobModel convert2YangJobModel(NewYangJobCommand newYangJobCommand) {
        String jobType = newYangJobCommand.getJobType();
        JobTypeEnum jobTypeEnum = JobTypeEnum.getJobTypeByName(jobType);
        if (jobType == null) {
            throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);
        }
        YangJobModel yangJobModel = new YangJobModel();
        yangJobModel.setJobName(newYangJobCommand.getJobName());
        yangJobModel.setDescription(newYangJobCommand.getDescription());
        yangJobModel.setCron(newYangJobCommand.getCron());
        yangJobModel.setOpen(newYangJobCommand.getOpen());
        yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(newYangJobCommand.getExecuteStrategy()));
        yangJobModel.setExecuteClassPath(newYangJobCommand.getExecuteClassPath());
        yangJobModel.setExecuteParamMap(newYangJobCommand.getParams());
        yangJobModel.setJobType(jobTypeEnum);
        if (jobTypeEnum == JobTypeEnum.REMOTE) {
            String ip = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_IP.getName());
            String port = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_PORT.getName());
            if (ip == null || port == null) {
                throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);
            }
            RemoteExecutorMessage remoteExecutorMessage = new RemoteExecutorMessage();
            remoteExecutorMessage.setIp(ip);
            remoteExecutorMessage.setPort(Integer.valueOf(port));
            yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);
        } else {
            if (yangJobModel.getExecuteClassPath() == null || yangJobModel.getExecuteClassPath().isEmpty()) {
                throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);
            }
            try {
                Class.forName(yangJobModel.getExecuteClassPath());
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
                throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);
            }
        }
        return yangJobModel;
    }

infra层

最后修改基础设施层,首先修改YangJobModelConvertor类,将RemoteMessage和JobType转化到features中,以及从features中取出

package com.yang.job.admin.infra.gatewayimpl.repository.convertor;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobModelFeatureEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.model.YangJobModel;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.data.YangJobData;
import com.yang.job.admin.infra.job.thread.RemoteJobExecuteThread;
import com.yang.job.admin.infra.utils.FeaturesUtils;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class YangJobModelConvertor {
public YangJobData convert2Data(YangJobModel yangJobModel) {
if (yangJobModel == null) {
return null;
}
YangJobData yangJobData = new YangJobData();
yangJobData.setJobId(yangJobModel.getJobId());
yangJobData.setJobName(yangJobModel.getJobName());
yangJobData.setDescription(yangJobModel.getDescription());
yangJobData.setCron(yangJobModel.getCron());
yangJobData.setExecuteClassPath(yangJobModel.getExecuteClassPath());
yangJobData.setEnable(yangJobModel.getEnable());
yangJobData.setOpen(yangJobModel.getOpen());
yangJobData.setCreateTime(yangJobModel.getCreateTime());
yangJobData.setUpdateTime(yangJobModel.getUpdateTime());
Map<string, string=“”> featureMap = yangJobModel.getFeatureMap();
featureMap.put(JobModelFeatureEnum.JOB_TYPE.getName(), yangJobModel.getJobType().getName());
featureMap.put(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName(), JSONObject.toJSONString(yangJobModel.getRemoteExecutorMessage()));
yangJobData.setFeatures(FeaturesUtils.convert2Features(featureMap));
yangJobData.setExecuteParams(FeaturesUtils.convert2Features(yangJobModel.getExecuteParamMap()));
yangJobData.setExecuteStrategy(yangJobModel.getExecuteStrategy().getName());
return yangJobData;
}

public YangJobModel convert2Model(YangJobData yangJobData) {
    if (yangJobData == null) {
        return null;
    }
    YangJobModel yangJobModel = new YangJobModel();
    yangJobModel.setJobId(yangJobData.getJobId());
    yangJobModel.setDescription(yangJobData.getDescription());
    yangJobModel.setCron(yangJobData.getCron());
    yangJobModel.setJobName(yangJobData.getJobName());
    yangJobModel.setExecuteClassPath(yangJobData.getExecuteClassPath());
    yangJobModel.setEnable(yangJobData.getEnable());
    yangJobModel.setOpen(yangJobData.getOpen());
    yangJobModel.setCreateTime(yangJobData.getCreateTime());
    yangJobModel.setUpdateTime(yangJobData.getUpdateTime());
    yangJobModel.setFeatureMap(FeaturesUtils.convert2FeatureMap(yangJobData.getFeatures()));
    yangJobModel.setExecuteParamMap(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));
    JobExecuteStrategyEnum executeStrategy = JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy());
    if (executeStrategy == null) {
        throw new RuntimeException("执行策略有误!");
    }

    JobTypeEnum jobType = JobTypeEnum.getJobTypeByName(yangJobModel.getFeatureMap().get(JobModelFeatureEnum.JOB_TYPE.getName()));
    yangJobModel.setJobType(jobType);
    String remoteMessageStr = yangJobModel.getFeatureMap().get(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName());
    RemoteExecutorMessage remoteExecutorMessage = JSONObject.parseObject(remoteMessageStr, RemoteExecutorMessage.class);
    yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);

    yangJobModel.setExecuteStrategy(executeStrategy);
    yangJobModel.setRunnable(buildRunnable(yangJobModel));

    return yangJobModel;
}

private Runnable buildRunnable(YangJobModel yangJobModel) {
    if (yangJobModel.isLocalJob()) {
        String executeClassPath = yangJobModel.getExecuteClassPath();
        try {
            Class<!--?--> aClass = Class.forName(executeClassPath);
            if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {
                throw new RuntimeException("该类必须实现IYangJobExecutor接口");
            }
            IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();
            YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);
            Runnable runnable = () -&gt; executor.execute(yangJobExecuteRequest);
            return runnable;
        } catch (InstantiationException | IllegalAccessException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            System.out.println(String.format("%s 类路径对应的类不存在", executeClassPath));
            e.printStackTrace();
        }
    } else {
        RemoteExecutorMessage remoteExecutorMessage = yangJobModel.getRemoteExecutorMessage();
        String executeClassPath = yangJobModel.getExecuteClassPath();

        YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();
        yangJobTransferDTO.setClassName(executeClassPath);

        YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);
        yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);

        return new RemoteJobExecuteThread(remoteExecutorMessage, yangJobTransferDTO);
    }
    return null;
}

private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobModel yangJobModel) {
    YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
    yangJobExecuteRequest.setJobId(yangJobModel.getJobId().toString());
    yangJobExecuteRequest.setParams(yangJobModel.getExecuteParamMap());
    return yangJobExecuteRequest;
}

}

</string,>

然后添加一个RemoteJobExecuteThread类,该类实现runnable接口,当我们的任务类型为远程调用时,其YangJobModel的runnable属性为remoteJobExecuteThread类

package com.yang.job.admin.infra.job.thread;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.core.dto.YangJobTransferDTO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class RemoteJobExecuteThread implements Runnable {
private YangJobTransferDTO yangJobTransferDTO;

private RemoteExecutorMessage remoteExecutorMessage;

public RemoteJobExecuteThread(RemoteExecutorMessage remoteExecutorMessage, YangJobTransferDTO yangJobTransferDTO) {
    this.remoteExecutorMessage = remoteExecutorMessage;
    this.yangJobTransferDTO = yangJobTransferDTO;
}

@Override
public void run() {
    try {
        String ip = remoteExecutorMessage.getIp();
        Integer port = remoteExecutorMessage.getPort();
        Socket socket = new Socket(ip, port);
        PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

        printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));

        bufferedReader.close();
        printWriter.close();
        socket.close();
    } catch (UnknownHostException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

}

测试

我们先启动之前的sample1项目,然后启动yang-job-admin,调用http://localhost:8080/job添加任务,请求体如下:

{
  "jobName": "RemoteJobExecutor",
"description":"RemoteJobExecutor",
  "cron": "0/10 * * * * ?",
  "executeStrategy": "withFixedDelay",
  "executeClassPath": "com.yang.job.sample1.task.TestTask1",
"open":1,
"jobType":"remote",
"params":{
"executorIp":"127.0.0.1",
"executorPort":"9999"
}
}

添加成功后,我们查看Sample1项目的控制台,可以看到,每10秒,这个TestTask1任务会被调用一次

参考文章

https://www.yihuo.tech/programming/server-stack/exploring-the-java-network-programming-paradigm-socket-udp-nio-and-netty-in-focus/


这是一个从 https://juejin.cn/post/7368777511952400410 下的原始话题分离的讨论话题