package foundation.io.engine;
|
|
import java.io.File;
|
import java.io.IOException;
|
import java.util.Date;
|
|
import foundation.dao.DataReader;
|
import foundation.dao.DataWriter;
|
import foundation.dao.Filter;
|
import foundation.dao.OrderBy;
|
import foundation.dao.bizlogic.ICodeProvider;
|
import foundation.data.entity.DictionaryTranslator;
|
import foundation.data.entity.Entity;
|
import foundation.data.entity.EntitySet;
|
import foundation.data.meta.field.FieldsRuntime;
|
import foundation.data.object.BatchCounter;
|
import foundation.data.object.DataObject;
|
import foundation.engine.Engine;
|
import foundation.io.FileRepository;
|
import foundation.io.IOContext;
|
import foundation.io.action.IOActionBucket;
|
import foundation.io.action.IOActionProvider;
|
import foundation.io.define.AppendMode;
|
import foundation.io.define.DataIO;
|
import foundation.io.define.DataIOContainer;
|
import foundation.io.define.DeleteMode;
|
import foundation.io.define.IODirection;
|
import foundation.io.define.IOSQLContext;
|
import foundation.io.define.IOSpeedMode;
|
import foundation.io.define.IOTask;
|
import foundation.io.define.IOWorkStep;
|
import foundation.io.define.IOWorkflow;
|
import foundation.io.mapping.IOMappings;
|
import foundation.io.mapping.MappingsRuntime;
|
import foundation.io.object.DownloadAction;
|
import foundation.io.object.DownloadWriter;
|
import foundation.io.object.Headers;
|
import foundation.io.object.Titles;
|
import foundation.persist.NamedSQL;
|
import foundation.token.IOnlineUser;
|
import foundation.util.ID;
|
import foundation.util.MapList;
|
import foundation.util.Util;
|
|
|
public class IOEngine extends Engine implements IReadDataListener {
|
|
private static int Batch_Count = 5000;
|
public static int Max_Empty_Record = 10;
|
public static boolean test = true;
|
|
protected DataIOContainer ioContainer;
|
protected DataReader dataReader;
|
protected DataWriter dataWriter;
|
protected IOTask ioTask;
|
protected FileProcessor fileProcessor;
|
protected IOResult result;
|
protected IOnlineUser user;
|
protected Date beginTime;
|
protected String ioBatchId;
|
protected ICodeProvider codeProvider;
|
protected IOContext context;
|
protected MapList<String, LineValueTrigger> valueTrigger;
|
protected DictionaryTranslator dictionaryTranslator ;
|
|
|
protected EntitySet entitySet;
|
|
|
public IOEngine() {
|
result = new IOResult(progressor);
|
progressor.setDebug(true);
|
fileProcessor = new FileProcessor(Batch_Count);
|
ioContainer = DataIOContainer.getInstance();
|
}
|
|
public void init(DataReader dataReader, DataWriter dataWriter, IOTask ioTask, IOContext context) {
|
this.dataReader = dataReader;
|
this.dataWriter = dataWriter;
|
this.ioTask = ioTask;
|
this.context = context;
|
}
|
|
@Override
|
protected void sameThreadRun(String operator, Object... args) throws Exception {
|
if ("exec".equalsIgnoreCase(operator)) {
|
//1. clear
|
result.setState(IOState.Working);
|
result.writeBegin();
|
|
//2. write begin
|
progressor.newTask("上传下载操作");
|
|
//3.
|
user = IOnlineUser.getInstance();
|
beginTime = new Date();
|
|
String ioBatchIdFromRequest = dataReader.getString("ioBatchId");
|
|
if (!Util.isEmpty(ioBatchIdFromRequest) ) {
|
ioBatchId = ioBatchIdFromRequest;
|
}else {
|
ioBatchId = ID.newValue();
|
}
|
|
if (context != null) {
|
context.setIoBatchId(ioBatchId);
|
}
|
}
|
}
|
|
@Override
|
protected void newThreadRun(String operator, Object... args) {
|
try {
|
try {
|
//1. open file
|
if (ioTask.existsUploadFile()) {
|
openUploadFile();
|
}
|
else if (ioTask.existsDownloadFile()) {
|
openDownloadFile();
|
}
|
|
//2. run data IO
|
int i = 1;
|
for (DataIO dataIO: ioTask) {
|
//2.1 运行 data IO
|
runOneDataIO(dataIO, i++);
|
|
//2.2 执行 Action
|
IOActionProvider ioAction = getActionInstance(dataIO);
|
if (ioAction != null) {
|
ioAction.exec(dataIO.getActionMethod(), ioTask, dataIO, entitySet);
|
}
|
|
//2.3 如果需要终止,停止运行
|
if ((ioAction != null) && ioAction.isTerminated()) {
|
break;
|
}
|
|
if (!dataWriter.isSuccess()) {
|
break;
|
}
|
}
|
}
|
finally {
|
//3. close file
|
if (ioTask.existsFile()) {
|
closeFile();
|
}
|
|
if (ioTask.existsDownloadFile()) {
|
writeDownloadFile();
|
}
|
}
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
finally {
|
progressor.endTask();
|
result.setState(IOState.Finish);
|
}
|
}
|
|
private void openUploadFile() throws IOException, Exception {
|
progressor.newPhase("file_getFile", "打开上载文件");
|
|
IOSpeedMode speedMode = ioTask.getSpeedMode();
|
File uploadFile = fileProcessor.openUploadFile(dataReader, speedMode);
|
|
if (context != null) {
|
context.setUploadFile(uploadFile);
|
}
|
|
progressor.appendLine("文件名称:" + fileProcessor.getUploadFileName());
|
progressor.endPhase();
|
}
|
|
private void openDownloadFile() throws Exception {
|
progressor.newPhase("file_getFile", "创建下载文件");
|
|
IOSpeedMode speedMode = ioTask.getSpeedMode();
|
DataIO dataIO = ioTask.getDownloadDataIO();
|
|
File file = fileProcessor.openDownloadFile(dataReader, dataIO, speedMode);
|
result.setFileURL(FileRepository.pathToURL(file));
|
progressor.appendLine("文件名称:" + file.getName());
|
|
progressor.endPhase();
|
}
|
|
private void writeDownloadFile() throws IOException {
|
File file = fileProcessor.getDownloadFile();
|
|
if (!file.exists()) {
|
logger.error("download file not exists: {}", file);
|
}
|
|
DownloadWriter downloadWriter = new DownloadWriter(dataWriter);
|
downloadWriter.write(file, file.getName(), DownloadAction.AsExcel);
|
}
|
|
private void closeFile() {
|
progressor.newPhase("file_closeFile", "关闭文件");
|
|
if (fileProcessor != null) {
|
fileProcessor.close();
|
}
|
|
progressor.endPhase();
|
}
|
|
private void runOneDataIO(DataIO dataIO, int idx) throws Exception {
|
//1. 开始新日志
|
String position = idx + "/" + ioTask.size();
|
progressor.newPhase("", dataIO.toString() + " (" + position + ")");
|
|
//2. 加载或创建 mapping
|
String mappingsId = dataIO.getMappingId();
|
IOMappings mappingsDesignTime = ioContainer.getIOMappings(mappingsId);
|
|
//3. 初始化 编码生成器
|
codeProvider = null;
|
this.valueTrigger = dataIO.getValueTriggers();
|
|
//4. 运行 data IO
|
IODirection direction = dataIO.getDirection();
|
|
if (IODirection.SheetToTable == direction) {
|
fromSheetToTable(dataIO, mappingsDesignTime);
|
}
|
else if (IODirection.SheetToMemory == direction) {
|
fromSheetToMemory(dataIO, mappingsDesignTime);
|
}
|
else if (IODirection.TableToSheet == direction) {
|
fromTableToSheet(dataIO, mappingsDesignTime);
|
}
|
else if (IODirection.ErrorsToSheet == direction) {
|
fromErrorsToSheet(dataIO, mappingsDesignTime);
|
}
|
else if (IODirection.TableToTable == direction) {
|
fromTableToTable(dataIO, mappingsDesignTime);
|
}
|
|
progressor.endPhase();
|
}
|
|
public void runTableToTables() throws Exception {
|
for (DataIO dataIO: ioTask) {
|
int idx = 1;
|
|
//1. 开始新日志
|
String position = idx + "/" + ioTask.size();
|
progressor.newPhase("", dataIO.toString() + " (" + position + ")");
|
|
//2. 加载或创建 mapping
|
String mappingsId = dataIO.getMappingId();
|
IOMappings mappingsDesignTime = ioContainer.getIOMappings(mappingsId);
|
|
fromTableToTable(dataIO, mappingsDesignTime);
|
idx++;
|
}
|
}
|
|
private void fromSheetToTable(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception {
|
//1. open sheet
|
ISheetReader sheetReader = fileProcessor.openSheetReader(dataIO);
|
|
if (sheetReader == null) {
|
return;
|
}
|
|
//2. read title
|
Titles fromTitles = sheetReader.readTitles();
|
|
//3. create mapping runtime
|
FieldsRuntime fields = dataIO.getToTempDataObject().getTableFieldMetas();
|
MappingsRuntime mappingsRuntime = mappingsDesignTime.createRuntime(fromTitles, fields, dataIO.getMappingDirection());
|
sheetReader.setMappingRuntime(mappingsRuntime);
|
|
//4. 根据需要创建 code provider (根据编码规则,生成自动编码)
|
codeProvider = dataIO.getCodeRule(mappingsRuntime);
|
valueTrigger = dataIO.getValueTriggers();
|
|
|
//5. 所有数据处理范围被局限在批号内(防止多人同时导入)
|
boolean error = false;
|
IOSQLContext ioContext = dataIO.getContext();
|
ioContext.setIoBatchId(ioBatchId);
|
dataWriter.addValue("ioBatchId",ioBatchId);
|
dataReader.loadOneParameter("ioBatchId",ioBatchId);
|
AppendMode appendMode = dataIO.getAppendMode();
|
|
try {
|
//6. 导入数据到临时表
|
DataObject toTempDataObject = dataIO.getToTempDataObject();
|
|
EntitySet entitySet = sheetReader.readData(mappingsRuntime, Batch_Count, this);
|
int count = toTempDataObject.insertEntitySet(entitySet);
|
progressor.appendLine("插入了" + count + "条数据到临时表");
|
|
while (entitySet.isFull()) {
|
entitySet = sheetReader.readData(mappingsRuntime, Batch_Count, this);
|
count = toTempDataObject.insertEntitySet(entitySet) + count;
|
progressor.appendLine("插入了" + count + "条数据到临时表");
|
}
|
|
//7. 运行导入 work flow
|
IOSQLContext sqlContext = dataIO.getContext();
|
sqlContext.setMainMappingsRuntime(mappingsRuntime);
|
|
IOWorkflow workflows = dataIO.getWorkflows();
|
for (IOWorkStep step: workflows) {
|
step.exec(sqlContext, progressor, dataWriter);
|
}
|
|
//8. 导入数据之外的检查(例如: 客商是否与单据头一致)
|
docFilterCheck(sqlContext, dataIO);
|
|
|
//9. 检查是否有错误,如果有错误退出并提示用户
|
NamedSQL namedSQL = NamedSQL.getInstance("importGetImportErrors");
|
sqlContext.setParametersTo(namedSQL);
|
EntitySet errorSet = namedSQL.getEntitySet();
|
|
if (!errorSet.isEmpty()) {
|
error = true;
|
dataWriter.addValue("error_lines", errorSet);
|
dataWriter.setSuccess(false);
|
|
progressor.newPhase("导入数据存在错误");
|
for (Entity entity: errorSet) {
|
progressor.appendLine(entity.getString("error_message") + ": " + entity.getInt("error_count") + "条");
|
}
|
|
return;
|
}
|
|
if (!dataIO.existsToName()) {
|
return;
|
}
|
|
//10.
|
FieldsRuntime toTempfields = dataIO.getToTempDataObject().getTableFieldMetas();
|
FieldsRuntime toFields = dataIO.getToDataObject().getTableFieldMetas();
|
MappingsRuntime toMappingsRuntime = mappingsDesignTime.createRuntime(toTempfields, toFields, null, true);
|
sqlContext.setToMappingsRuntime(toMappingsRuntime);
|
|
//11. 根据 AppendMode 决定是否删除正式表中的原有数据
|
context.setToDateObject(dataIO.getToDataObject());
|
context.setToTempDateObject(dataIO.getToTempDataObject());
|
|
if (AppendMode.ClearAndAppend == appendMode) {
|
if (dataIO.existsFilterFieldValues()) {
|
NamedSQL.execute("deleteToTableByFilter", ioContext);
|
}
|
else {
|
NamedSQL.execute("emptyToTable", ioContext);
|
}
|
|
progressor.appendLine("删除表[" + dataIO.getToName() + "]原有数据");
|
}
|
|
//12. 将临时表数据搬运到正式表
|
namedSQL = NamedSQL.getInstance("transferTempAll");
|
sqlContext.setParametersTo(namedSQL);
|
int cnt = namedSQL.execute();
|
progressor.appendLine("插入了" + cnt + "条数据到正式表");
|
}
|
finally {
|
//13. 删除本次导入的临时表数据(根据批次删除,另外需要定时任务配合检查过期临时数据)
|
DataObject dataObject = DataObject.getInstance(dataIO.getToTempName());
|
|
if (AppendMode.AppendResponse == appendMode) {
|
EntitySet entitySet = dataObject.getTableEntitySet(new Filter("io_batch_id", ioBatchId));
|
dataWriter.addValue("import_data", entitySet);
|
}
|
|
if (!error) {
|
dataObject.deleteEntitySet(new Filter("io_batch_id", "<>", Util.quotedStr(ioBatchId)));
|
}
|
|
}
|
}
|
|
private void docFilterCheck(IOSQLContext sqlContext, DataIO dataIO) throws Exception {
|
String[] segments = dataIO.getCheckDocField();
|
|
if (segments == null || segments.length == 0) {
|
return ;
|
}
|
|
NamedSQL sql = NamedSQL.getInstance("importCheckMatchDocument");
|
sqlContext.setParametersTo(sql);
|
Filter filter = new Filter();
|
|
for (String segment : segments) {
|
filter.add(segment, dataReader.getString(segment));
|
}
|
|
sql.setFilter(filter);
|
sql.execute();
|
|
}
|
|
private void fromSheetToMemory(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception {
|
//1. open sheet
|
ISheetReader sheetReader = fileProcessor.openSheetReader(dataIO);
|
|
if (sheetReader == null) {
|
return;
|
}
|
|
//2. read title
|
Titles fromTitles = sheetReader.readTitles();
|
|
//3. create mapping runtime
|
FieldsRuntime fields = dataIO.getToTempDataObject().getTableFieldMetas();
|
MappingsRuntime mappingsRuntime = mappingsDesignTime.createRuntime(fromTitles, fields, dataIO.getMappingDirection());
|
sheetReader.setMappingRuntime(mappingsRuntime);
|
|
//5. write data to table
|
IOSQLContext ioContext = dataIO.getContext();
|
ioContext.setIoBatchId(ioBatchId);
|
entitySet = sheetReader.readData(mappingsRuntime, Batch_Count, this);
|
progressor.appendLine("读取数据:" + entitySet.size() + "条");
|
|
//6. write result
|
result.setTitles(fromTitles);
|
result.setEntitySet(entitySet);
|
}
|
|
private void fromTableToSheet(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception {
|
//1. 获取数据过滤条件(用户权限过滤、页面传过来的过滤条件)
|
DataObject fromDataObject = dataIO.getFromDataObject();
|
|
Filter filter = new Filter();
|
OrderBy orderBy = new OrderBy();
|
dataReader.loadFilterFromJSON(fromDataObject, filter);
|
dataReader.loadOrderByFromJSON(fromDataObject, orderBy);
|
fromDataObject.loadUserLineFilter(user, filter);
|
|
//2. 获取数据总数量,并设置进度计数器
|
int total = fromDataObject.getCount(filter, dataReader.getUser());
|
progressor.setMax(total);
|
progressor.setStepLength(Batch_Count);
|
|
BatchCounter counter = new BatchCounter(total, Batch_Count);
|
|
//3. 打开Sheet
|
ISheetWriter sheetWriter = fileProcessor.openSheetWriter(dataIO);
|
|
if (sheetWriter == null) {
|
return;
|
}
|
|
try {
|
MappingsRuntime mappingsRuntime = null;
|
FieldsRuntime fields = dataIO.getFromDataObject().getBatchBrowseFieldMetas();
|
|
//4. 如果设置为导出标题,不需要根据标题创建Mapping,Mapping关系取自设定或根据字段导出
|
if (dataIO.isWriteTitle()) {
|
Headers headers = new Headers();
|
mappingsRuntime = mappingsDesignTime.createRuntime(fields, headers, dataIO.getMappingDirection(), IODirection.TableToSheet);
|
sheetWriter.writeTitles(headers.getTitles());
|
}
|
else {
|
Headers headers = sheetWriter.readHeaders();
|
mappingsRuntime = mappingsDesignTime.createRuntime(fields, headers, dataIO.getMappingDirection(), IODirection.TableToSheet);
|
}
|
|
//5. set mapping runtime
|
sheetWriter.setMappingRuntime(mappingsRuntime);
|
sheetWriter.parepareWriteData();
|
|
//7. write lines
|
EntitySet entitySet = fromDataObject.getBrowseEntitySet(filter, orderBy, counter);
|
|
int cnt = 0;
|
cnt = sheetWriter.writeData(entitySet);
|
progressor.appendLine("写入数据" + cnt + "行");
|
|
while (entitySet.isFull()) {
|
entitySet = fromDataObject.getBrowseEntitySet(filter, orderBy, counter);
|
cnt = cnt + sheetWriter.writeData(entitySet);
|
progressor.appendLine("写入数据" + cnt + "行");
|
|
progressor.stepIt();
|
}
|
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
progressor.appendLine("错误:" + e.getMessage());
|
}
|
}
|
|
private void fromErrorsToSheet(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception {
|
//1. 获取数据总数量,并设置进度计数器
|
DataObject toTempObject = dataIO.getToTempDataObject();
|
Filter filter = new Filter("io_batch_id", ioBatchId);
|
|
int total = toTempObject.getCount(filter, dataReader.getUser());
|
progressor.setMax(total);
|
progressor.setStepLength(Batch_Count);
|
|
BatchCounter counter = new BatchCounter(total, Batch_Count);
|
|
//2. 打开Sheet
|
ISheetWriter sheetWriter = fileProcessor.openSheetWriter(dataIO);
|
|
if (sheetWriter == null) {
|
return;
|
}
|
|
try {
|
//3. writer error message title
|
sheetWriter.writeErrorTitle();
|
|
//4. create mapping runtime
|
FieldsRuntime fields = dataIO.getToTempDataObject().getTableFieldMetas();
|
MappingsRuntime mappingsRuntime = mappingsDesignTime.createRuntime(fields, new Headers(), dataIO.getMappingDirection(), IODirection.ErrorsToSheet);
|
sheetWriter.setMappingRuntime(mappingsRuntime);
|
sheetWriter.parepareWriteData();
|
|
//4. write error
|
EntitySet entitySet = toTempObject.getBrowseEntitySet(filter, counter);
|
|
int cnt = 0;
|
cnt = sheetWriter.writeErrors(entitySet);
|
progressor.appendLine("写入数据" + cnt + "行");
|
|
while (entitySet.isFull() && cnt != 0 ) {
|
entitySet = toTempObject.getBrowseEntitySet(filter, counter);
|
cnt = cnt + sheetWriter.writeErrors(entitySet);
|
|
progressor.appendLine("写入数据" + cnt + "行");
|
progressor.stepIt();
|
}
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
progressor.appendLine("错误:" + e.getMessage());
|
}
|
}
|
|
private void fromTableToTable(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception {
|
//1.
|
IOSQLContext ioContext = dataIO.getContext();
|
ioContext.setIoBatchId(ioBatchId);
|
|
//2.1 create to mapping runtime
|
FieldsRuntime fromFields = dataIO.getFromDataObject().getTableFieldMetas();
|
FieldsRuntime toFields = dataIO.getToDataObject().getTableFieldMetas();
|
MappingsRuntime toMappingsRuntime = mappingsDesignTime.createRuntime(fromFields, toFields, dataIO.getMappingDirection());
|
ioContext.setToMappingsRuntime(toMappingsRuntime);
|
ioContext.setMainMappingsRuntime(toMappingsRuntime);
|
|
AppendMode appendMode = dataIO.getAppendMode();
|
DeleteMode deleteMode = dataIO.getDeleteMode();
|
int cnt = 0;
|
|
//3. 运行 work flow
|
IOSQLContext sqlContext = dataIO.getContext();
|
|
IOWorkflow workflows = dataIO.getWorkflows();
|
for (IOWorkStep step: workflows) {
|
step.exec(sqlContext, progressor, dataWriter);
|
}
|
|
if (AppendMode.Append == appendMode) {
|
cnt = NamedSQL.execute("transferAll", ioContext);
|
progressor.appendLine("插入数据:" + cnt);
|
}
|
else if (AppendMode.ClearAndAppend == appendMode) {
|
//1). clear data
|
if (dataIO.existsFilterFieldValues()) {
|
cnt = NamedSQL.execute("deleteToTableByFilter", ioContext);
|
}
|
else {
|
cnt = NamedSQL.execute("emptyToTable", ioContext);
|
}
|
progressor.appendLine("删除数据:" + cnt);
|
|
//2). transfer data
|
cnt = NamedSQL.execute("transferAll", ioContext);
|
progressor.appendLine("插入数据:" + cnt);
|
}
|
else if (AppendMode.UpdateChanged == appendMode) {
|
//1). delete not exists data
|
if (DeleteMode.HardDelete == deleteMode) {
|
cnt = NamedSQL.execute("hardDeleteToTableNotExists", ioContext);
|
progressor.appendLine("删除数据:" + cnt);
|
}
|
else if (DeleteMode.SoftDelete == deleteMode) {
|
cnt = NamedSQL.execute("softDeleteToTableNotExists", ioContext);
|
progressor.appendLine("软删除数据:" + cnt);
|
}
|
|
//2). update exists data
|
cnt = NamedSQL.execute("updateToTableExists", ioContext);
|
progressor.appendLine("更新数据:" + cnt);
|
|
//3). insert append data
|
cnt = NamedSQL.execute("insertToTableAppend", ioContext);
|
progressor.appendLine("插入数据:" + cnt);
|
}
|
else if (AppendMode.InsertChanged == appendMode) {
|
//1). update exists data
|
cnt = NamedSQL.execute("updateToTableExists", ioContext);
|
progressor.appendLine("更新数据:" + cnt);
|
|
//2). insert append data
|
cnt = NamedSQL.execute("insertToTableAppend", ioContext);
|
progressor.appendLine("插入数据:" + cnt);
|
}
|
}
|
|
@Override
|
public void onReadLine(Entity entity) {
|
if (!valueTrigger.isEmpty()) {
|
for (LineValueTrigger valueTrigger : valueTrigger) {
|
valueTrigger.onReadLine(entity);
|
}
|
}
|
else if (codeProvider != null) {
|
String code = codeProvider.nextval(null, entity);
|
entity.set("temp_code", code);
|
}
|
|
//2. 自动生成其他内容
|
entity.set("io_batch_id", ioBatchId);
|
entity.set("creator_id", user.getId());
|
entity.set("creator_name", user.getName());
|
entity.set("create_time", beginTime);
|
entity.set("update_time", beginTime);
|
}
|
|
private IOActionProvider getActionInstance(DataIO dataIO) {
|
String path = dataIO.getAction();
|
|
if (Util.isEmpty(path)) {
|
return null;
|
}
|
|
Class<IOActionProvider> providerClass = IOActionBucket.getInstance().getOne(path);
|
|
if (providerClass == null) {
|
return null;
|
}
|
|
IOActionProvider provider = null;
|
try {
|
provider = providerClass.newInstance();
|
provider.init(dataReader, dataWriter);
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
return provider;
|
}
|
|
public IOResult getResult() {
|
return result;
|
}
|
|
public void setIoBatchId(String ioBatchId) {
|
this.ioBatchId = ioBatchId;
|
}
|
|
}
|