package foundation.io.engine; import java.io.File; import java.io.IOException; import java.util.Date; import foundation.dao.DataReader; import foundation.dao.DataWriter; import foundation.dao.Filter; import foundation.dao.OrderBy; import foundation.dao.bizlogic.ICodeProvider; import foundation.data.entity.DictionaryTranslator; import foundation.data.entity.Entity; import foundation.data.entity.EntitySet; import foundation.data.meta.field.FieldsRuntime; import foundation.data.object.BatchCounter; import foundation.data.object.DataObject; import foundation.engine.Engine; import foundation.io.FileRepository; import foundation.io.IOContext; import foundation.io.action.IOActionBucket; import foundation.io.action.IOActionProvider; import foundation.io.define.AppendMode; import foundation.io.define.DataIO; import foundation.io.define.DataIOContainer; import foundation.io.define.DeleteMode; import foundation.io.define.IODirection; import foundation.io.define.IOSQLContext; import foundation.io.define.IOSpeedMode; import foundation.io.define.IOTask; import foundation.io.define.IOWorkStep; import foundation.io.define.IOWorkflow; import foundation.io.mapping.IOMappings; import foundation.io.mapping.MappingsRuntime; import foundation.io.object.DownloadAction; import foundation.io.object.DownloadWriter; import foundation.io.object.Headers; import foundation.io.object.Titles; import foundation.persist.NamedSQL; import foundation.token.IOnlineUser; import foundation.util.ID; import foundation.util.MapList; import foundation.util.Util; public class IOEngine extends Engine implements IReadDataListener { private static int Batch_Count = 5000; public static int Max_Empty_Record = 10; public static boolean test = true; protected DataIOContainer ioContainer; protected DataReader dataReader; protected DataWriter dataWriter; protected IOTask ioTask; protected FileProcessor fileProcessor; protected IOResult result; protected IOnlineUser user; protected Date beginTime; protected String ioBatchId; protected ICodeProvider codeProvider; protected IOContext context; protected MapList valueTrigger; protected DictionaryTranslator dictionaryTranslator ; protected EntitySet entitySet; public IOEngine() { result = new IOResult(progressor); progressor.setDebug(true); fileProcessor = new FileProcessor(Batch_Count); ioContainer = DataIOContainer.getInstance(); } public void init(DataReader dataReader, DataWriter dataWriter, IOTask ioTask, IOContext context) { this.dataReader = dataReader; this.dataWriter = dataWriter; this.ioTask = ioTask; this.context = context; } @Override protected void sameThreadRun(String operator, Object... args) throws Exception { if ("exec".equalsIgnoreCase(operator)) { //1. clear result.setState(IOState.Working); result.writeBegin(); //2. write begin progressor.newTask("上传下载操作"); //3. user = IOnlineUser.getInstance(); beginTime = new Date(); String ioBatchIdFromRequest = dataReader.getString("ioBatchId"); if (!Util.isEmpty(ioBatchIdFromRequest) ) { ioBatchId = ioBatchIdFromRequest; }else { ioBatchId = ID.newValue(); } if (context != null) { context.setIoBatchId(ioBatchId); } } } @Override protected void newThreadRun(String operator, Object... args) { try { try { //1. open file if (ioTask.existsUploadFile()) { openUploadFile(); } else if (ioTask.existsDownloadFile()) { openDownloadFile(); } //2. run data IO int i = 1; for (DataIO dataIO: ioTask) { //2.1 运行 data IO runOneDataIO(dataIO, i++); //2.2 执行 Action IOActionProvider ioAction = getActionInstance(dataIO); if (ioAction != null) { ioAction.exec(dataIO.getActionMethod(), ioTask, dataIO, entitySet); } //2.3 如果需要终止,停止运行 if ((ioAction != null) && ioAction.isTerminated()) { break; } if (!dataWriter.isSuccess()) { break; } } } finally { //3. close file if (ioTask.existsFile()) { closeFile(); } if (ioTask.existsDownloadFile()) { writeDownloadFile(); } } } catch (Exception e) { e.printStackTrace(); } finally { progressor.endTask(); result.setState(IOState.Finish); } } private void openUploadFile() throws IOException, Exception { progressor.newPhase("file_getFile", "打开上载文件"); IOSpeedMode speedMode = ioTask.getSpeedMode(); File uploadFile = fileProcessor.openUploadFile(dataReader, speedMode); if (context != null) { context.setUploadFile(uploadFile); } progressor.appendLine("文件名称:" + fileProcessor.getUploadFileName()); progressor.endPhase(); } private void openDownloadFile() throws Exception { progressor.newPhase("file_getFile", "创建下载文件"); IOSpeedMode speedMode = ioTask.getSpeedMode(); DataIO dataIO = ioTask.getDownloadDataIO(); File file = fileProcessor.openDownloadFile(dataReader, dataIO, speedMode); result.setFileURL(FileRepository.pathToURL(file)); progressor.appendLine("文件名称:" + file.getName()); progressor.endPhase(); } private void writeDownloadFile() throws IOException { File file = fileProcessor.getDownloadFile(); if (!file.exists()) { logger.error("download file not exists: {}", file); } DownloadWriter downloadWriter = new DownloadWriter(dataWriter); downloadWriter.write(file, file.getName(), DownloadAction.AsExcel); } private void closeFile() { progressor.newPhase("file_closeFile", "关闭文件"); if (fileProcessor != null) { fileProcessor.close(); } progressor.endPhase(); } private void runOneDataIO(DataIO dataIO, int idx) throws Exception { //1. 开始新日志 String position = idx + "/" + ioTask.size(); progressor.newPhase("", dataIO.toString() + " (" + position + ")"); //2. 加载或创建 mapping String mappingsId = dataIO.getMappingId(); IOMappings mappingsDesignTime = ioContainer.getIOMappings(mappingsId); //3. 初始化 编码生成器 codeProvider = null; this.valueTrigger = dataIO.getValueTriggers(); //4. 运行 data IO IODirection direction = dataIO.getDirection(); if (IODirection.SheetToTable == direction) { fromSheetToTable(dataIO, mappingsDesignTime); } else if (IODirection.SheetToMemory == direction) { fromSheetToMemory(dataIO, mappingsDesignTime); } else if (IODirection.TableToSheet == direction) { fromTableToSheet(dataIO, mappingsDesignTime); } else if (IODirection.ErrorsToSheet == direction) { fromErrorsToSheet(dataIO, mappingsDesignTime); } else if (IODirection.TableToTable == direction) { fromTableToTable(dataIO, mappingsDesignTime); } progressor.endPhase(); } public void runTableToTables() throws Exception { for (DataIO dataIO: ioTask) { int idx = 1; //1. 开始新日志 String position = idx + "/" + ioTask.size(); progressor.newPhase("", dataIO.toString() + " (" + position + ")"); //2. 加载或创建 mapping String mappingsId = dataIO.getMappingId(); IOMappings mappingsDesignTime = ioContainer.getIOMappings(mappingsId); fromTableToTable(dataIO, mappingsDesignTime); idx++; } } private void fromSheetToTable(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception { //1. open sheet ISheetReader sheetReader = fileProcessor.openSheetReader(dataIO); if (sheetReader == null) { return; } //2. read title Titles fromTitles = sheetReader.readTitles(); //3. create mapping runtime FieldsRuntime fields = dataIO.getToTempDataObject().getTableFieldMetas(); MappingsRuntime mappingsRuntime = mappingsDesignTime.createRuntime(fromTitles, fields, dataIO.getMappingDirection()); sheetReader.setMappingRuntime(mappingsRuntime); //4. 根据需要创建 code provider (根据编码规则,生成自动编码) codeProvider = dataIO.getCodeRule(mappingsRuntime); valueTrigger = dataIO.getValueTriggers(); //5. 所有数据处理范围被局限在批号内(防止多人同时导入) boolean error = false; IOSQLContext ioContext = dataIO.getContext(); ioContext.setIoBatchId(ioBatchId); dataWriter.addValue("ioBatchId",ioBatchId); dataReader.loadOneParameter("ioBatchId",ioBatchId); AppendMode appendMode = dataIO.getAppendMode(); try { //6. 导入数据到临时表 DataObject toTempDataObject = dataIO.getToTempDataObject(); EntitySet entitySet = sheetReader.readData(mappingsRuntime, Batch_Count, this); int count = toTempDataObject.insertEntitySet(entitySet); progressor.appendLine("插入了" + count + "条数据到临时表"); while (entitySet.isFull()) { entitySet = sheetReader.readData(mappingsRuntime, Batch_Count, this); count = toTempDataObject.insertEntitySet(entitySet) + count; progressor.appendLine("插入了" + count + "条数据到临时表"); } //7. 运行导入 work flow IOSQLContext sqlContext = dataIO.getContext(); sqlContext.setMainMappingsRuntime(mappingsRuntime); IOWorkflow workflows = dataIO.getWorkflows(); for (IOWorkStep step: workflows) { step.exec(sqlContext, progressor, dataWriter); } //8. 导入数据之外的检查(例如: 客商是否与单据头一致) docFilterCheck(sqlContext, dataIO); //9. 检查是否有错误,如果有错误退出并提示用户 NamedSQL namedSQL = NamedSQL.getInstance("importGetImportErrors"); sqlContext.setParametersTo(namedSQL); EntitySet errorSet = namedSQL.getEntitySet(); if (!errorSet.isEmpty()) { error = true; dataWriter.addValue("error_lines", errorSet); dataWriter.setSuccess(false); progressor.newPhase("导入数据存在错误"); for (Entity entity: errorSet) { progressor.appendLine(entity.getString("error_message") + ": " + entity.getInt("error_count") + "条"); } return; } if (!dataIO.existsToName()) { return; } //10. FieldsRuntime toTempfields = dataIO.getToTempDataObject().getTableFieldMetas(); FieldsRuntime toFields = dataIO.getToDataObject().getTableFieldMetas(); MappingsRuntime toMappingsRuntime = mappingsDesignTime.createRuntime(toTempfields, toFields, null, true); sqlContext.setToMappingsRuntime(toMappingsRuntime); //11. 根据 AppendMode 决定是否删除正式表中的原有数据 context.setToDateObject(dataIO.getToDataObject()); context.setToTempDateObject(dataIO.getToTempDataObject()); if (AppendMode.ClearAndAppend == appendMode) { if (dataIO.existsFilterFieldValues()) { NamedSQL.execute("deleteToTableByFilter", ioContext); } else { NamedSQL.execute("emptyToTable", ioContext); } progressor.appendLine("删除表[" + dataIO.getToName() + "]原有数据"); } //12. 将临时表数据搬运到正式表 namedSQL = NamedSQL.getInstance("transferTempAll"); sqlContext.setParametersTo(namedSQL); int cnt = namedSQL.execute(); progressor.appendLine("插入了" + cnt + "条数据到正式表"); } finally { //13. 删除本次导入的临时表数据(根据批次删除,另外需要定时任务配合检查过期临时数据) DataObject dataObject = DataObject.getInstance(dataIO.getToTempName()); if (AppendMode.AppendResponse == appendMode) { EntitySet entitySet = dataObject.getTableEntitySet(new Filter("io_batch_id", ioBatchId)); dataWriter.addValue("import_data", entitySet); } if (!error) { dataObject.deleteEntitySet(new Filter("io_batch_id", "<>", Util.quotedStr(ioBatchId))); } } } private void docFilterCheck(IOSQLContext sqlContext, DataIO dataIO) throws Exception { String[] segments = dataIO.getCheckDocField(); if (segments == null || segments.length == 0) { return ; } NamedSQL sql = NamedSQL.getInstance("importCheckMatchDocument"); sqlContext.setParametersTo(sql); Filter filter = new Filter(); for (String segment : segments) { filter.add(segment, dataReader.getString(segment)); } sql.setFilter(filter); sql.execute(); } private void fromSheetToMemory(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception { //1. open sheet ISheetReader sheetReader = fileProcessor.openSheetReader(dataIO); if (sheetReader == null) { return; } //2. read title Titles fromTitles = sheetReader.readTitles(); //3. create mapping runtime FieldsRuntime fields = dataIO.getToTempDataObject().getTableFieldMetas(); MappingsRuntime mappingsRuntime = mappingsDesignTime.createRuntime(fromTitles, fields, dataIO.getMappingDirection()); sheetReader.setMappingRuntime(mappingsRuntime); //5. write data to table IOSQLContext ioContext = dataIO.getContext(); ioContext.setIoBatchId(ioBatchId); entitySet = sheetReader.readData(mappingsRuntime, Batch_Count, this); progressor.appendLine("读取数据:" + entitySet.size() + "条"); //6. write result result.setTitles(fromTitles); result.setEntitySet(entitySet); } private void fromTableToSheet(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception { //1. 获取数据过滤条件(用户权限过滤、页面传过来的过滤条件) DataObject fromDataObject = dataIO.getFromDataObject(); Filter filter = new Filter(); OrderBy orderBy = new OrderBy(); dataReader.loadFilterFromJSON(fromDataObject, filter); dataReader.loadOrderByFromJSON(fromDataObject, orderBy); fromDataObject.loadUserLineFilter(user, filter); //2. 获取数据总数量,并设置进度计数器 int total = fromDataObject.getCount(filter, dataReader.getUser()); progressor.setMax(total); progressor.setStepLength(Batch_Count); BatchCounter counter = new BatchCounter(total, Batch_Count); //3. 打开Sheet ISheetWriter sheetWriter = fileProcessor.openSheetWriter(dataIO); if (sheetWriter == null) { return; } try { MappingsRuntime mappingsRuntime = null; FieldsRuntime fields = dataIO.getFromDataObject().getBatchBrowseFieldMetas(); //4. 如果设置为导出标题,不需要根据标题创建Mapping,Mapping关系取自设定或根据字段导出 if (dataIO.isWriteTitle()) { Headers headers = new Headers(); mappingsRuntime = mappingsDesignTime.createRuntime(fields, headers, dataIO.getMappingDirection(), IODirection.TableToSheet); sheetWriter.writeTitles(headers.getTitles()); } else { Headers headers = sheetWriter.readHeaders(); mappingsRuntime = mappingsDesignTime.createRuntime(fields, headers, dataIO.getMappingDirection(), IODirection.TableToSheet); } //5. set mapping runtime sheetWriter.setMappingRuntime(mappingsRuntime); sheetWriter.parepareWriteData(); //7. write lines EntitySet entitySet = fromDataObject.getBrowseEntitySet(filter, orderBy, counter); int cnt = 0; cnt = sheetWriter.writeData(entitySet); progressor.appendLine("写入数据" + cnt + "行"); while (entitySet.isFull()) { entitySet = fromDataObject.getBrowseEntitySet(filter, orderBy, counter); cnt = cnt + sheetWriter.writeData(entitySet); progressor.appendLine("写入数据" + cnt + "行"); progressor.stepIt(); } } catch (Exception e) { e.printStackTrace(); progressor.appendLine("错误:" + e.getMessage()); } } private void fromErrorsToSheet(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception { //1. 获取数据总数量,并设置进度计数器 DataObject toTempObject = dataIO.getToTempDataObject(); Filter filter = new Filter("io_batch_id", ioBatchId); int total = toTempObject.getCount(filter, dataReader.getUser()); progressor.setMax(total); progressor.setStepLength(Batch_Count); BatchCounter counter = new BatchCounter(total, Batch_Count); //2. 打开Sheet ISheetWriter sheetWriter = fileProcessor.openSheetWriter(dataIO); if (sheetWriter == null) { return; } try { //3. writer error message title sheetWriter.writeErrorTitle(); //4. create mapping runtime FieldsRuntime fields = dataIO.getToTempDataObject().getTableFieldMetas(); MappingsRuntime mappingsRuntime = mappingsDesignTime.createRuntime(fields, new Headers(), dataIO.getMappingDirection(), IODirection.ErrorsToSheet); sheetWriter.setMappingRuntime(mappingsRuntime); sheetWriter.parepareWriteData(); //4. write error EntitySet entitySet = toTempObject.getBrowseEntitySet(filter, counter); int cnt = 0; cnt = sheetWriter.writeErrors(entitySet); progressor.appendLine("写入数据" + cnt + "行"); while (entitySet.isFull() && cnt != 0 ) { entitySet = toTempObject.getBrowseEntitySet(filter, counter); cnt = cnt + sheetWriter.writeErrors(entitySet); progressor.appendLine("写入数据" + cnt + "行"); progressor.stepIt(); } } catch (Exception e) { e.printStackTrace(); progressor.appendLine("错误:" + e.getMessage()); } } private void fromTableToTable(DataIO dataIO, IOMappings mappingsDesignTime) throws Exception { //1. IOSQLContext ioContext = dataIO.getContext(); ioContext.setIoBatchId(ioBatchId); //2.1 create to mapping runtime FieldsRuntime fromFields = dataIO.getFromDataObject().getTableFieldMetas(); FieldsRuntime toFields = dataIO.getToDataObject().getTableFieldMetas(); MappingsRuntime toMappingsRuntime = mappingsDesignTime.createRuntime(fromFields, toFields, dataIO.getMappingDirection()); ioContext.setToMappingsRuntime(toMappingsRuntime); ioContext.setMainMappingsRuntime(toMappingsRuntime); AppendMode appendMode = dataIO.getAppendMode(); DeleteMode deleteMode = dataIO.getDeleteMode(); int cnt = 0; //3. 运行 work flow IOSQLContext sqlContext = dataIO.getContext(); IOWorkflow workflows = dataIO.getWorkflows(); for (IOWorkStep step: workflows) { step.exec(sqlContext, progressor, dataWriter); } if (AppendMode.Append == appendMode) { cnt = NamedSQL.execute("transferAll", ioContext); progressor.appendLine("插入数据:" + cnt); } else if (AppendMode.ClearAndAppend == appendMode) { //1). clear data if (dataIO.existsFilterFieldValues()) { cnt = NamedSQL.execute("deleteToTableByFilter", ioContext); } else { cnt = NamedSQL.execute("emptyToTable", ioContext); } progressor.appendLine("删除数据:" + cnt); //2). transfer data cnt = NamedSQL.execute("transferAll", ioContext); progressor.appendLine("插入数据:" + cnt); } else if (AppendMode.UpdateChanged == appendMode) { //1). delete not exists data if (DeleteMode.HardDelete == deleteMode) { cnt = NamedSQL.execute("hardDeleteToTableNotExists", ioContext); progressor.appendLine("删除数据:" + cnt); } else if (DeleteMode.SoftDelete == deleteMode) { cnt = NamedSQL.execute("softDeleteToTableNotExists", ioContext); progressor.appendLine("软删除数据:" + cnt); } //2). update exists data cnt = NamedSQL.execute("updateToTableExists", ioContext); progressor.appendLine("更新数据:" + cnt); //3). insert append data cnt = NamedSQL.execute("insertToTableAppend", ioContext); progressor.appendLine("插入数据:" + cnt); } else if (AppendMode.InsertChanged == appendMode) { //1). update exists data cnt = NamedSQL.execute("updateToTableExists", ioContext); progressor.appendLine("更新数据:" + cnt); //2). insert append data cnt = NamedSQL.execute("insertToTableAppend", ioContext); progressor.appendLine("插入数据:" + cnt); } } @Override public void onReadLine(Entity entity) { if (!valueTrigger.isEmpty()) { for (LineValueTrigger valueTrigger : valueTrigger) { valueTrigger.onReadLine(entity); } } else if (codeProvider != null) { String code = codeProvider.nextval(null, entity); entity.set("temp_code", code); } //2. 自动生成其他内容 entity.set("io_batch_id", ioBatchId); entity.set("creator_id", user.getId()); entity.set("creator_name", user.getName()); entity.set("create_time", beginTime); entity.set("update_time", beginTime); } private IOActionProvider getActionInstance(DataIO dataIO) { String path = dataIO.getAction(); if (Util.isEmpty(path)) { return null; } Class providerClass = IOActionBucket.getInstance().getOne(path); if (providerClass == null) { return null; } IOActionProvider provider = null; try { provider = providerClass.newInstance(); provider.init(dataReader, dataWriter); } catch (Exception e) { e.printStackTrace(); } return provider; } public IOResult getResult() { return result; } public void setIoBatchId(String ioBatchId) { this.ioBatchId = ioBatchId; } }