±à¼ÍƼö: |
±¾ÎÄÀ´×Ôcsdn£¬±¾ÎÄÖ÷Òª½éÉÜÁËPipeline(Á÷Ë®Ïß)ģʽµÄÎÊÌâ½â¾ö˼·£¬²¢Í¨¹ýʾÀý´úÂë½éÉÜPipelineģʽµÄ·þÎñ£¬Ï£Íû±¾ÎĶÔÄúµÄѧϰÓÐËù°ïÖú¡£ |
|
ģʽÃû³Æ
Pipeline(Á÷Ë®Ïß)ģʽ
ģʽ½â¾öµÄÎÊÌâ
ÓÐʱһЩÏ̵߳IJ½×à±È½ÏÈß³¤£¬¶øÇÒÓÉÓÚÿ¸ö½×¶ÎµÄ½á¹ûÓëϽ׶εÄÖ´ÐÐÓйØÏµ£¬ÓÖ²»ÄÜ·Ö¿ª¡£
½â¾ö˼·
¿ÉÒÔ½«ÈÎÎñµÄ´¦Àí·Ö½âΪÈô¸É¸ö´¦Àí½×¶Î£¬ÉÏÒ»¸ö½×¶ÎÈÎÎñµÄ½á¹û½»¸øÏÂÒ»¸ö½×¶ÎÀ´´¦Àí£¬ÕâÑùÿ¸öÏ̵߳Ĵ¦ÀíÊDz¢Ðеģ¬¿ÉÒÔ³ä·ÖÀûÓÃ×ÊÔ´Ìá¸ß¼ÆËãЧÂÊ¡£
ģʽËùʹÓõÄÀࣺ
Pipe¶Ô´¦Àí½×¶ÎµÄ³éÏ󣬸ºÔð¶ÔÊäÈë½øÐд¦Àí£¬²¢½«Êä³ö×÷ΪÏÂÒ»¸ö½×¶ÎµÄÊäÈë:process£¨£©ÓÃÓÚ½ÓÊÕǰһ¸ö´¦Àí½×¶ÎµÄ´¦Àí½á¹û£¬×÷Ϊ¸Ã´¦Àí½×¶ÎµÄÊäÈë,init£¨£©³õʼ»¯µ±Ç°´¦Àí½×¶Î¶ÔÍâÌṩµÄ·þÎñ,shutdown£¨£©¹Ø±Õµ±Ç°´¦Àí½×¶Î¶ÔÍâÌṩµÄ·þÎñ,setNextPipe£¨£©ÉèÖõ±Ç°´¦Àí½×¶ÎµÄÏÂÒ»¸ö´¦Àí½×¶Î¡£
PipeContext¶Ô¸÷¸ö´¦Àí½×¶ÎµÄ¼ÆËã»·¾³½øÐгéÏó£¬Ö÷ÒªÓÃÓÚÒì³£´¦Àí:handleError£¨£©ÓÃÓÚ¶Ô´¦Àí½×¶ÎÅÝ´×µÄÒì³£½øÐд¦Àí¡£
AbstractPipeÀàPipe½Ó¿ÚµÄ³éÏóʵÏÖÀà:process£¨£©½ÓÊÕÇÀÒ»¸ö´¦Àí½×¶ÎµÄÊäÈë²¢µ÷ÓÃÆä×ÓÀàʵÏÖµÄdoProcess·½·¨¶ÔÊäÈëÔªËØ½øÐд¦Àí,init£¨£©±£´æ¶ÔÆä²ÎÊýÖÐÖÆ¶¨µÄPipeContextʵÀýµÄÒýÓã¬×ÓÀà¿É¸ù¾ÝÐèÒª¸²¸Ç¸Ã·½·¨ÒÔʵÏÖÆä·þÎñµÄ³õʼ»¯¡£
ShutdownĬÈÏʵÏÖʲôҲ²»×ö£¬×ÓÀà¿É¸ù¾ÝÐèÒª¸²¸Ç¸Ã·½·¨ÊµÏÖ·þÎñÍ£Ö¹:setNextPipe£¨£©ÉèÖõ±Ç°´¦Àí½×¶ÎµÄÏÂÒ»¸ö´¦Àí½×¶Î,doProcess£¨£©Áô¸ø×ÓÀàʵÏֵijéÏó·½·¨£¬ÓÃÓÚʵÏÖÆä·þÎñµÄ³õʼ»¯¡£
WorkerThreadPipeDecprator»ùÓÚ¹¤×÷Ï̵߳ÄPipeʵÏÖÀ࣬¸ÃPipeʵÀý»á½«½ÓÊÕµ½µÄÊäÈëÔªËØ´æÈë¶ÓÁУ¬ÓÉÖÆ¶¨¸öÊýµÄ¹¤×÷ÕßÏ̶߳ԶÓÁÐÖÐÊäÈëÔªËØ½øÐд¦Àí£¬¸ÃÀàµÄ×ÔÉíÖ÷Òª¸ºÔð¹¤×÷ÕßÏ̵߳ÄÉúÃüÖÜÆÚµÄ¹ÜÀí:process£¨£©½ÓÊÕǰһ¸ö´¦Àí½×¶ÎµÄÊäÈ룬²¢½«Æä´æÈë¶ÓÁÐÓɹ¤×÷ÕßÏß³ÌÔËÐÐʱȡ³ö½øÐд¦Àí,init£¨£©Æô¶¯¹¤×÷ÕßÏ̲߳¢µ÷ÓÃίÍÐPipeʵÀýµÄinit·½·¨,shutdown£¨£©Í£Ö¹¹¤×÷ÕßÏ̲߳¢Î¯ÍÐPipeʵÀýµÄshutdown·½·¨,setNextPipe£¨£©µ÷ÓÃίÍÐPipeʵÀýµÄsetNextPipe·½·¨,dispatch£¨£©È¡¶ÓÁÐÖеÄÊäÈëÔªËØ²¢µ÷ÓÃίÍÐPipeʵÀýµÄprocess·½·¨¶ÔÆä½øÐд¦Àí
ThreadPoolPopeDecorator»ùÓÚÏ̳߳صÄPipeµÄʵÏÖÀà:process£¨£©½ÓÊÕǰһ¸ö´¦Àí½×¶ÎµÄÊäÈ룬²¢ÏòÏ̳߳ØÌá½»Ò»¸ö¶Ô¸ÃÊäÈë½øÐÐÏàÓ¦´¦ÀíµÄÈÎÎñ,init£¨£©µ÷ÓÃίÍÐpipeʵÀýµÄinit·½·¨,shutdown£¨£©¹Ø±Õµ±Ç°PipeʵÀý¶ÔÍâÌṩµÄ·þÎñ²¢µ÷ÓÃίÍÐPipeʵÀýµÄshutdown·½·¨,setNextPipe£¨£©µ÷ÓÃίÍÐPipeʵÀýµÄsetNextPipe·½·¨¡£
AbstractParallelPileÀàAbstractPipeµÄ×ÓÀ֧࣬³Ö²¢Ðд¦ÀíµÄPipeʵÏÖÀ࣬¸ÃÀà¶ÔÆäÿ¸öÊäÈëÔªËØ£¨ÔʼÈÎÎñ£©Éú³ÉÏàÓ¦µÄÒ»×é×ÓÈÎÎñ£¬²¢ÒÔ²¢Ðеķ½Ê½È¥Ö´ÐÐÕâЩ×ÓÈÎÎñ£¬¸÷¸ö×ÓÈÎÎñµÄÖ´Ðнá¹û»á±»ºÏ²¢ÎªÏàÓ¦ÔʼÈÎÎñµÄÊä³ö½á¹û£ºbulidTasks()Á÷¸ø×ÓÀàʵÏֵijéÏó·½·¨£¬ÓÃÓÚ¸ù¾ÝÖÆ¶¨µÄÊäÈë¹¹ÔìÒ»×é×ÓÈÎÎñ,combineResults()Áô¸ø×ÓÀàʵÏֵijéÏó·½·¨£¬¶Ô¸÷¸ö²¢ÐÐ×ÓÈÎÎñµÄ´¦Àí½á¹û½øÐкϲ¢£¬ÐγÉÏàÓ¦ÊäÈëÔªËØµÄÊä³ö½á¹û¡£invokeParallel()ʵÏÖÒÔ²¢Ðеķ½Ê½Ö´ÐÐÒ»×éÈÎÎñ,doProcess()ʵÏÖ¸ÃÀà¶ÔÆäÊäÈëµÄ´¦ÀíÂß¼¡£
ConcreteParallelPipeÀàÓÉÓ¦Óö¨ÒåµÄAbstractParallelPipeµÄ×ÓÀà:buildTasks()¸ù¾ÝÖ¸¶¨µÄÊäÈë¹¹ÔìÒ»×é×ÓÈÎÎñ,combineResults()¶Ô¸÷¸ö²¢ÐÐ×ÓÈÎÎñµÄ´¦Àí½á¹û½øÐкϲ¢£¬ÐγÉÏàÓ¦ÊäÈëÔªËØµÄÊä³ö½á¹û
PipelineÀà¶Ô·ûºÏPipeµÄ³éÏó£ºaddPipe()Íù¸ÃPipelineʵÀýÖÐÌí¼ÓÒ»¸öPipeʵÀý¡£
SimplePipelineÀà»ùÓÚAbstractPipeµÄPipeline½Ó¿ÚµÄÒ»¸ö¼òµ¥ÊµÏÖÀࣺaddPipe()Íù¸ÃPipelineʵÀýÖÐÌí¼ÓÒ»¸öPipeʵÀý,addAsWorkerThreadBasedPipe()½«Öƶ¨µÄPipeʵÀýÓÃWorkerThreadPipeDecoratorʵÀý°ü×°ºó¼ÓÈëPipelineʵÀý,addAsThreadPoolBasedPipe()½«Öƶ¨µÄPipeʵÀýÓÃThreadPoolPipeDecoratorʵÀý°ü×°ºó¼ÓÈëPipelineʵÀý¡£
PipelineģʽµÄ·þÎñ³õʼ»¯ÐòÁÐͼ
ʾÀý´úÂë
ijϵͳÐèÒªÒ»¸öÊý¾Ýͬ²½µÄ¶¨Ê±ÈÎÎñ£¬¸Ã¶¨Ê±ÈÎÎñ½«Êý¾Ý¿âÖзûºÏÖÆ¶¨Ìõ¼þµÄ¼Ç¼Êý¾ÝÒÔÎļþµÄÐÎʽFTP´«Ê䣨ͬ²½£©µ½Öƶ¨µÄÖ÷»úÉÏ¡£¸Ã¶¨Ê±ÈÎÎñÐèÒªÂú×ãÒÔÏÂÒªÇó£º
1.ÿ¸öÊý¾ÝÎļþ×î¶àÖ»°üº¬N£¨Èç10000£¬¾ßÌå¿ÉÅäÖã©Ìõ¼Ç¼£»µ±Ò»¸öÊý¾ÝÎļþ±»Ð´Âúʱ£¬ÆäËû´úл¼Ç¼»á±»Ð´ÈëеÄÊý¾ÝÎļþ¡£
2.ÿ¸öÊý¾ÝÎļþ¿ÉÄÜÐèÒª±»´«Êäµ½¶ą̀Ö÷»úÉÏ¡£
3.±¾µØÒª±£Áôͬ²½¹ýµÄÊý¾ÝÎļþµÄ±¸·Ý¡£
Òò´Ë£¬¸Ã¶¨Ê±ÈÎÎñÐèÒª×öÈý¼þÊÂÇ飬¶¼ÊDZȽϺÄʱµÄ²Ù×÷£¬¶øÇÒºóÃæµÄ²Ù×÷»¹ÐèÒªÒÀÀµÇ°Ãæ²Ù×÷µÄ½á¹û²ÅÄܽøÐУ¬²»Òײð·Ö£¬Èç¹ûÖ»ÊÇÓöàỊ̈߳¬Ã¿¸öÏß³ÌÖÐÈÔÈ»Êǰ´Ë³Ðò´®Ðд¦ÀíÒ²ÊDz»ºÏÊʵģ¬ÕâÑùµÄ»°µÚ¶þ¸ö²½×à»á³öÏÖ¶àÏß³ÌÖ®¼äÕù¶á×ÊÔ´µ¼ÖÂʱ¼äÀ˷ѵÄÎÊÌ⣬»á¸üÄÑÍê³ÉÈÎÎñ£¬ËùÒÔÐèÒªÓÐPipelineģʽȥִÐС£
Êý¾Ýͬ²½¶¨Ê±ÈÎÎñ
public class
DataSynctask implements Runnable{
public void run(){
ResultSet rs = null;
SimplePipeline<RecordSaveTask,String>
pipeline = buildPipeline();
pipeline.init(pipeline.newDefaultPipeContext());
Connection dbConn = null;
try{
dbConn = getConnection();
rs = qryRecords(dbConn);
processRecords(rs.pipeline);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != dbConn){
try{
dbConn.close();
}catch (SQLException e){
;
}
}
pipeline.shutdown(360,TimeUnit.SECONDS);
}
private ResultSet qryRecords(Connection dbConn) throws Exception{
dbConn.setReadOnly(true);
PreparedStatement ps = dbConn
.prepareStatement("select id,productId,packageId,msisdn,operationTime,
operationTyoe," +
"effectiveDate,dueDate from subscriptions
order by operationTime",
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ
_ONLY);
ResultSet rs = ps.executeQuery();
return rs;
}
private static Connection getConnection() throws
Exception{
Connection dbConn = null;
Class.forName("org.hsqldb.jdbc.JDBCDriver");
dbConn = DriverManager.getConnectio ("jdbc:hsqldb:hsql://192.168.1.105:9001","SA","");
return dbConn;
}
private static Record makeRecordFrom(ResultSet
rs)
throws SQLException{
Record record = new Record();
record.setId(rs.getInt("id"));
record.setProductId(rs.getString("productId"));
record.setPackageId(rs.getString("msisdn"));
record.setOperationTime(rs.getTimestamp ("operationTime"));
record.setOperationType(rs.getInt("operationType"));
record.setEffectiveDate(rs.getTimestamp ("effectiveDate"));
record.setDueDate(rs.getTimestamp("dueDate"));
}
private static class RecordSaveTask{
public final Record[] records;
public final int targetFileIndex;
public final String recordDay;
public RecordSaveTask(Record[] records,int targetFileIndex){
this.records = records;
this.targetFileIndex = targetFileIndex;
this.recordDay = null;
}
puclic RecordSaveTask(String recordDay,int
targeFileIndex){
this.records = null;
this.targetFileIndex = targetFileIndex;
this.recordDay = recordDay;
}
} private SimplePipeline<RecordSavetask,String>
buildPipeline(){
/*
* Ï̵߳ı¾ÖÊÊÇÖØ¸´ÀûÓÃÒ»¶¨ÊýÁ¿µÄỊ̈߳¬¶ø²»ÊÇÕë¶Ô ÿ¸öÈÎÎñ
¶¼ÓÐÒ»¸öרÃŵŤ×÷ÕßÏ̡߳£
* ÕâÀ¸÷¸öPipeµÄ³õʼ»¯ÍêÈ«¿ÉÒÔÔÚÉÏÓÎPipe³õʼ»¯
Íê±ÏºóÔÙ
³õʼ»¯Æäºó¼ÌPipe£¬¶þ²»±Ø¶à
* ¸öPipeͬʱ³õʼ»¯¡£
* Òò´Ë£¬Õâ¸ö³õʼ»¯µÄ¶¯×÷¿ÉÒÔÓÉÒ»¸öÏß³ÌÀ´´¦Àí¡£
¸ÃÏ̴߳¦Àí
Íê¸÷¸öPipeµÄ³õʼ»¯ºó£¬¿ÉÒÔ¼ÌÐø
* ´¦ÀíÖ®ºó¿ÉÄܲúÉúµÄÈÎÎñ£¬Èç³ö´í´¦Àí¡£
* ËùÒÔ£¬ÉÏÊöÕâЩÏȺó²úÉúµÄÈÎÎñ¿ÉÒÔÓÉÏ̳߳ØÖеÄ
Ò»¸ö¹¤×÷Õß
Ï̴߳ÓÍ·µ½Î²Ö´ÐС£
*/
final ExecutorService helperExecutor = Executors.newSingleThreadExecutor();
final SimplePipeline<RecordSaveTask,String>
pipeline = new
SimplePipeline<RecordSaveFile,String>(helperExcecutor);
Pipe<RecordSaveTask,File> stageSaveFile
= new AbstractPipe<RecordSaveTask,File>(){
final RecordWriter recordWriter =RecordWriter.getInstance();
final Record[] records = task.records;
File file;
if(null == records){
file = recordWriter.write(records,task.targerFileIndex);
}else{
try{
file = recordWriter.write(records.task.targetFileIndex);
}catch(IOException e){
throw new PipeException(this,task,"Failed
to save records",e);
}
}
};
/*
* ÓÉÓÚÕâÀïµÄ¼¸¸öPipe¶¼ÊÇ´¦ÀíI/OµÄ£¬ÎªÁ˱ÜÃâʹÓÃËø
£¨ÒÔ¼õÉÙ²»±ØÒªµÄÉÏÏÂÎÄÇл»£©µ«ÓÖÄÜ
* ±£Ö¤Ḭ̈߳²È«£¬¹Êÿ¸öPipe¶¼²ÉÓõ¥Ï̴߳¦Àí¡£
* Èô¸÷¸öPipeÒª¸ÄÓÃÏ̳߳ØÀ´´¦Àí£¬ÐèҪעÒ⣺1
Ḭ̈߳²È«2£©ËÀËø¡£
*/
pipeline.addAsWorkerThreadBasedPipe(stageSaveFile,1);
final String[][] ftpServerConfigs = retrieveFTP
ServConf();
final ThreadPoolExecutor ftpExecutorService
= new ThreadPoolExecutor(1
ftpServerConfigs.length,60,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),new
RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor
executor){
if(!executor.isShutdown()){
try{
executor.getQueue().put(r);
}catch(InterruptedException e){
;
}
}
}
});
Pipe<File,File,File> stageTransferFile
= new AbstractParallelPipe<File,File,
File>((new SynchronousQueue<file>(),ftpExecutor
Service)){
Future[ftpServerConfigs.length];
@Override
public void init(PipeContext piptCtx){
super.init(pipeCtx);
String[] ftpServerConfig;
for(int i = 0; i <ftoServerConfigs.length;
i++){
ftpServerConfig = ftpServerConfigs[i];
ftpClientUtilHolders[i] = FTPClientUtil.newInstance(
ftpServerConfig[0],ftpServerConfig[1],ftpServer
Config[2]);
}
}
@Override
protected List<Callable<File>> buildTasks(final
File file){
List<Callable<File>> tasks = new
LinkedList<Callable<File>>();
for(Fucture<FTPClientUtil> ftpClientUtilHolder:
ftpClientUtilHolders){
tasks.add(new ParallelTask(fipClientUtilHolder,file));
}
return tasks;
}
@Override
protected File combineResults(List<Future<File>>
subTaskResults)
throws Exception{
if(0 == subTaskResults.size()){
return null;
}
File file = null;
file = subTaskResults.get(0).get();
return file;
}
@Override
public void shutdown(long timeout,TimeUnit unit){
super.shutdown(timeout/unit);
ftpExecutorService.shutdown();
try{
ftpExecutorService.awaitTermination(timeout,
unit);
}catch(InterruptedException e){
;
}
for(Future<FTPClientUtil> ftpClientUtilHolder:
ftpClientUtilHolders){
try{
ftpClientUtilHolder.get().disconnect();
}catch(Exception e){
;
}
}
}
class ParallelTask implements Callable<File>{
public final Future<FTPClientUtil>ftpUtilHodler;
public final File file2Transfer;
public ParallelTask(Future<FTPClientUtil>
ftpUtilHodler,
File file2Transfer){
this.ftpUtilHodler = ftpUtilHodler;
this.file2Transfer = file2Transfer;
}
@Override
public File call() throws Exception{
File transferedFile = null;
ftpUtilHodler.get().upload(file2Transfer);
transferedFile = file2Transfer;
return transferedFile;
}
}
};
pipeline.addAsWorkerThreadBasedPipe(stageTrans
ferFile,1);
//±¸·ÝÒѾ´«ÊäµÄÊý¾ÝÎļþ
Pipe<File,Void> stageBackupFile = new
AbstractPipe<File,Void>(){
@Override
protected Void doProcess(File transferedFile)
throws PipeException{
RecordWriter.backupFile(transferedFile);
return null;
}
public void shutdown(long timeout,TimeUnit unit){
//ËùÓÐÎļþ±¸·ÝÍê±Ïºó£¬ÆßÀïµô¿ÕÎļþ¼Ð
RecordWriter.purgeDir();
}
};
pipeline.addAsWorkerThreadBasedPipe(stageTransferFile,
1);
return pipeline;
}
private String[] retrieveFTPServConf(){
String[][]ftpServerConfigs = new String[][]{
{"192.168.1.105","datacenter","abc123"}
,
{"192.168.1.105","datacenter","abc123"}
,
{"192.168.1.105","datacenter","abc123"}
};
return ftpServerConfits;
}
private void processRecords(ResiltSet rs,
Pipeline<RecordSaveTask,String>pipeline)
throws Exception{
Recprd record;
Recprd[]records = new Record[Config.RECORD_SAVE_CHNK_SIZE];
int targeFileIndex = 0;
int nextTargetFileIndex = 0;
int recordCountInTheDay = 0;
int recordCountInTheFile = 0;
String recordDay = null;
String lastRecordDay = null;
SimpleDateFormat sdf = new SimpleDateFormat("yyMMdd");
while(rs.next()){
record = makeRecordFrom(rs);
lastRecordDay = recordDay;
recordDay = sdf.format(record.getOperationTime());
if(recordDay.equals(lastRecordDay)){
records[recordCountInTheFile] = record;
recordCountInTheDay++;
}else{
//ʵ¼ÊÒÔ·¢ÉúµÄ²»Í¬ÈÕÆÚ¼Ç¼ÎļþÇл»
if(null != lastRecprdDay){
pipeline.process(new RecordSaveTask(Arrays.copyOf(records,
recordCountInTheFile).targetFileIndex));
}else{
pipeline
.process(new RecordSaveTask(laskRecordDay,targeFileIndex));
}
//ÔÚ´Ë֮ǰ£¬ÏȽ«recordsÖеÄÄÚÈÝдÈëÎļþ
records[0] = record;
recordCountInTheFile = 0;
}
recordCountInTheDay = 1;
}
if(nextTargetFileIndex == targetFileIndex){
recordCountInTheFile++;
if(0 == (recordCountInTheFile %Config.RECORD_SAVE_CHUNK_SIZE)){
pipeline.process(new RecordSaveTask(Arrays.copyOf(recirds,
recordCountInTheFile),targetFileIndex));
recordCountInTheFile = 0;
}
}
nextTargetFileIndex = (recordCountInTheDay)/
Config.MAX_RECORDS_PER_FILE;
if(nextTargetFileIndex > targetFileIndex){
//Ô¤²âµ½½«·¢ÉúͬÈÕÆÚ¼Ç¼ÎļþÇл»
if(recordCountInTheFile > 1){
pipeline.process(new RecordSaveTask(Arrays.copyOf(records,
recordCountInTheFile),targetFileIndex));
}else{
pipeline.process(new RecordSaveTask(recordDay,targetFileIndex));
}
recordCountInTheFile = 0;
targetFileIndex = nextTargetFileIndex;
}else if(nextTargetFileIndex < targetFileIndex){
//ʵ¼ÊÒÑ·¢ÉúµÄÒìÈÕÆÚ¼Ç¼ÎļþÇл»£¬recordCountInTheFile
±£³Öµ±Ç°Öµ
targetFileIndex = nextTargetFileIndex;
}
}
if(recordCountInTheFile > 0){
pipeline.process(new RecordSaveTask(Arrays.copyOf(records,
recordCountInTheFile),targetFileIndex));
}
}
}
|
FTPClientUtilÀà
public class
FTPClientUtil {
private final FtpClient ftp = new FTPClient();
private final Map<String,Boolean>dirCreateMap
= new HashMap<String,Boolean>();
/*
* helperExecutorÊǸö¾²Ì¬±äÁ¿£¬ÕâʹµÃnewInstance·½·¨ÔÚÉú³É²»Í¬µÄFTPClientUtilʵÀýÊÇ
* ¿ÉÒÔ¹«ÓÃͬһ¸öÏ̳߳أ»
* ģʽ½ÇÉ«£ºPromise.TaskExecutor
*/
private volatile static ExecurorService helperExecutor;
static{
helperExcurtor = new ThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors()*2,
60,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),new
ThreadFactory(){
public Thread newThread(Runable r){
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
},new ThreadPoolExecutor.CallerRunsPolicy());
}
//˽Óй¹ÔìÆ÷
private FTPClientUtil(){
}
//ģʽ½ÇÉ«£ºPromise.Promisor.conmpute
public static Future<FTOClientUtil> newInstance(final
String ftpServer,final
String userName,final String password){
Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>(){
@Override
public FTPClientUtil call() throws Exception{
FTPClientUtil selt = new FTPClientUtil();
selt.init(ftpServer,userName,password);
return self;
}
};
//taskÏ൱ÓÚģʽ½ÇÉ«£ºPromise.Primise
final FutureTask<FTOClientUtil> task = new
FutureTask<FTOClientUtil>(callable);
helperExecuror.execute(task);
return task;
}
private void init(String ftpServer,String userName,String
password)throws
Exception{
FTPClientConfig config = new FTPClientConfig();
ftp.configure(config);
int reply;
ftp.connect(ftpServer);
System.out.print(ftp.getReplyString());
reply =ftp.getReplyCode();
if(!FTPReply.isPositiveCompletion(reply)){
ftp.disconnect();
throw new RuntimeException("FTp server refufused
connection.");
}
boolean isOK = ftp.login(uesrName, password);
if(isOK){
System.out.println(fto,getReplyString());
}else{
throw new RuntimeException("Failed to login."
+ ftp.getReplyString());
}
reply = ftp.cwd("`/subspeync");
if(!FTPReply.isPositiveCompletion(reply)){
ftp.disconnect();
throw new RuntimeException("Failed to change
working"+ "directory.reply."
+ reply);
}else{
System.out.println(ftp.getReplyString());
}
ftp.setFileType(FTP.ASEII_FILE_TYPE);
}
public void upload(File file) throws Exception{
InputStream dataIn = new BufferedInputStream(new
FileInputStream(file)1024*8);
boolean isOS;
String dirName = file.getParent();
String fileName = dirName + '/' + file.getName();
ByteArrayInputStream checkFileInputSream = new
ByteArrayInputStream("".getBytes());
try{
if(!dirCreateMap.containskey(dirName)){
ftp.makeDirectory(dirName);
dirCreateMap.put(dirName,null);
try{
isOK = ftp.storeFile(fileName,dataIn);
}catch(IOException e){
throw new RuntimeException("Failed to upload
" + file,e);
}
if(isOK){
ftp.storeFile(fileName + ".c",checkFileInputStream);
}else{
throw new RuntimeException("Failed to upload
" + file + ",reply:" + ","
+ ftp.getReplyString());
}
}finally{
dataIn.close();
}
}
public void disconnect(){
if(fto,isConnected){
try{
ftp.disconnect();
}catch(IOException ioe){
//ʲôҲ²»×ö
}
}
}
}
} |
RecordWriterÀà
public class
RecirdWriter {
private static final RecoedWriter INSTANCE =
new RecordWriter();
//HashMap²»Êǰ²È«µÄ£¬µ«RecordWriterʵÀýÊÇÔÚµ¥Ïß
³ÌÖÐʹÓõÄ
£¬Òò´Ë²»»á²úÉúÎÊÌâ
private static Map<Sting,PrintWriter>
printWriterMap
= new HashMap<String,PrintWriter>();
private static String baseDir;
private static final char FIELD_SEPARATOR =
'|';
//SimpleDateFormat²»ÊÇḬ̈߳²È«µÄ£¬µ«RecordWriter
ʵÀý
ÊÇÔÚµ¥Ïß³ÌÖÐʹÓõģ¬Òò´Ë²»»á²úÉúÎÊÌâ¡£
private static final SimpleDateFormat FILE_INDEX_
FORMATTER = new DecimalFormat{
"0000"};
private static final int RECORD_JOIN_SIZE =
Config.RECORD_JOIN_SIZE;
private static final FieldPosition FIELD_POS
= new FieldPssition(
DateFormat.Field.DAY_OF_MONTH);
//˽Óй¹ÔìÆ÷
private RecordWriter(){
baseDir = System.getProperty("user.home")
+ "/tmp/
subspsync/";
}
public static RecordWriter getInstance(){
return INSTANCE;
}
public File write(Record[]targetFileIndex) throws
IOException{
if(null == records || 0 == records.length){
throw new IllegalArgumentException("records
is null or empty");}
int recordCount = records.length;
String recordDay;
recordDay = DIRECTORY_NAME_FORMATTER.format
(records[0].
getOperat
ionTime());
String fileKey = recordDay + '-' + targeFileIndex;
PrintWriter pwr = printWriterMap.get(fileKey);
if(null == pwr){
File file = new Fiole(baseDir + '/' + recordDay
+
"/subspeync-gw-"
+ FILE_INDEX_FORMATTER.format(targetFileIndex)
+
".dat");
File dir = file.getParentFile();
if(!dir.exists() && !dir.mkdirr()){
throw new IOException("No such directory:"
+ dir);
}
pwr = new PrintWriter(new BufferedWritern(new
FileWriter(file,true),
Cofig.WRITER_BUFFER_SIZE));
printWriterMap.put(fileKey,pwr);
}
StringBuffer strBuf = new StringBuffer(40);
int i = 0;
for(Record record:records){
i++;
pwr.print(String.valueOf(record.getId()));
pwr.print(FIELD_SEPARATOR);
pwr.print(record.getMsisdn());
pwr.print(FIELD_SEPARATOR);
pwr.print(record.getProductId());
pwr.print(FIELD_SEPARATOR);
pwr.print(record.getRackageId());
pwr.print(FUEKD_SEPARATOR);
pwr.print(String.valueOf(record. getOperationType()));
pwr.print(FIELD_SEPARATOR);
strBuf.delete(0, 40);
pwr.print(sdf.format(recore.getOperationtime(),
strBuff,FIELD_POS));pwr.print(FIELD_SEPARATOR);
strBuf.delete(0, 40);
pwr.print(sdf.format(recore.getOperationtime(),
strBuff,FIELD_POS));
strBuf.delete(0, 40);
pwr.print(FIELD_SEPARATOR);
pwr.print(sdf.format(record.getDueDate(),
strBuf,FIELD_POS));
pwr.print('\n');
if(0 == (i % RECORD_JOIN_SIZE)){
pwr.flush();
i = 0;
//Thread.yield();
}
}
if(i > 0){
pwr.flush();
}
File file = null;
//´¦Àíµ±Ç°ÎļþÖеÄ×îºóÒ»×é¼Ç¼
if(recordCount <Config.RECORD_SAVE_CHUNK_SIZE){
pwr.close();
file = new FIle(baseDir + '/' + recordDay +
"/
subspsync-gw-"
+ FILE_INDEX_FORMATER.format(targetFileIndex)
+ ".dat");
printWriterMap.remove(fileKey);
}
return file;
}
public File finishRecords(String recordDay,int
targetFileIndex){
String fileKey = recordDay + '-' + targetFileIndex;
PrintWriter pwr = printWriterMap.get(fileKey);
File file = null;
if(null != pwr){
pwr.flush();
pwr.close();
file = new File(baseDir + '/' + recordDay +
"/
subspsync-gw-"
+ FILE_INDEX_FORMATER.format(targetFileIndex)
+
".dat");
printWriterMap.remove(fileKey);
}
retutn file;
}
public static void backupFile(final File file){
String recordDay = file.getParentFile().getName();
File destFile = new File(baseDir + "/backup"
+
recordDay);
if(!destFile.exists()){
throw new RuntimeException("Failed to backup
file
" + file);
}
file.delete();
}
public static void purgeDir(){
File[]dirs = new File(baseDir).listFiles();
for(File dir:dirs){
if(dir.isDirectory() && 0 ==dir.list().length){
dir.delete();
}
}
}
}
|
ģʽ¿¼Á¿
Pipelineģʽ¿ÉÒÔ¶ÔÓÐÒÀÀµ¹ØÏµµÄÈÎÎñʵÏÖ²¢Ðд¦Àí¡£²¢ÐкͲ¢·¢±à³ÌÖУ¬ÎªÁËÌá¸ß²¢·¢ÐÔÎÒÃÇÍúÍúÐèÒª½«¹æÄ£½Ï´óµÄÈÎÎñ·Ö½â³·¹ñÈô¸É¸ö¹æÄ£½ÏСµÄ×ÓÈÎÎñ£¬ÕâЩ×ÓÈÎÎñ¼äͬ³ÇûÓÐÒÀÀµ¹ØÏµ¡£¶øPipelineģʽÔòÔÊÐí×ÓÈÎÎñ¼ä´æÔÚÒÀÀµ¹ØÏµµÄÌõ¼þÏÂʵÏÖ²¢ÐÐÔËËã¡£
PipelineģʽΪÓõ¥Ïß³Ìģʽ±à³ÌÌṩÁ˱ãÀû¡£¶àÏ̱߳à³Ì×ܵÄÀ´ËµÊǸ´Ôӵ쬲»½ö´úÂë±àд±È½Ï¸´ÔÓ£¬³öÏÖÎÊÌâÒ²²»ºÃ¶¨Î»£¬¶àÏ̳߳öÏÖ·ÇÔ¤ÆÚ½á¹ûÊÇ£¬¿ª·¢ÈËÔ±²»½öÒª¿¼ÂÇËã·¨ÊÇ·ñÕýÈ·£¬»¹Òª¿¼ÂÇÊÇ·ñÊǶàÏß³ÌÏȹØÎÊÌâµ¼Ö·ÇÔ¤ÆÚµÄ½á¹û¡£Ïà·´£¬µ¥Ï̱߳à³Ì¾ÍÏÔµÃÏà¶Ô¼òµ¥¡£Pipelineģʽ·Ç³£±ãÓÚÎÒÃDzÉÓõ¥Ïß³ÌģʽʵÏÖ¶Ô×ÓÈÎÎñµÄ´¦Àí¡£
PipelineģʽÖУ¬Ã¿¸öPipelineʵÀý¶¼ÊÇÒ»¸öPipeʵÀý£¬Òò´Ë£¬ÎÒÃÇ¿ÉÒÔÌí¼Ó³ÉÆäËûʵÀý£¬Õâ¾Í¼Ó´óÁ˸ÃģʽµÄÀ©Õ¹ÐÔºÍÁé»îÐÔ¡£
µ±È»PipelineģʽҲÓÐÒ»¶¨µÄ·çÏÕ£ºPipelineģʽÖи÷¸ö´¦Àí½×¶ÎËùÓõŤ×÷ÕßÏ̻߳òÕßÏ̳߳أ¬±íʾ¸÷¸ö½×¶ÎµÄÊäÈë/Êä³ö¶ÔÏóµÄ´´½¨ºÍÒ»¶¨£¨½ø³ö¶ÓÁУ©¶¼ÓÐÆä×ÔÉíµÄʱ¼äºÍ¿Õ¼ä¿ªÏú£¬ËùÒÔʹÓÃPipelineģʽµÄʱºòÐèÒª¿¼ÂÇËüËù¸¶³öµÄ´ú¼Û¡£½¨Òé´¦Àí¹æÄ£½Ï´óµÄÈÎÎñ£¬·ñÔò¿ÉÄܵò»³¥Ê§¡£
ģʽÐèҪעÒâµÄ¶«Î÷
1.PipelineµÄÉî¶È:PipelineÖÐPipeµÄ¸öÊý±»³Æ×÷PipelineµÄÉî¶È¡£ËùÒÔÎÒÃÇÔÚÓÃPipelineµÄÉî¶ÈÓëJVMËÞÖ÷»úµÄCPU¸öÊý¼äµÄ¹ØÏµ¡£Èç¹ûPipelineʵÀýËù´¦µÄÈÎÎñ¶àÊôÓÚCPUÃܼ¯ÐУ¬ÄÇôÉî¶È×îºÃ²»³¬¹ýNcpu¡£Èç¹ûPipelineËù´¦ÀíµÄÈÎÎñ¶àÊôÓÚI/OÃܼ¯ÐÍ£¬ÄÇôPipelineµÄÉî¶È×îºÃ²»Òª³¬¹ý2*Ncpu¡£
2.»ùÓÚÏ̳߳صÄPipe£ºÈç¹ûPipeʵÀýʹÓÃÏ̳߳أ¬ÓÉÓÚÓжà¸öPipeʵÀý£¬¸üÈÝÒ׳öÏÖÏß³ÌËÀËøµÄÎÊÌ⣬ÐèÒª×Ðϸ¿¼ÂÇ¡£
3.´íÎó´¦Àí£ºPipeʵÀý¶ÔÆäÈÎÎñ½øÐйý³ÌÖÐÅܳöµÄÒì³£¿ÉÄÜÐèÒªÏàÓ¦PipeʵÀýÖ®Íâ½øÐд¦Àí¡£´Ëʱ£¬´¦Àí·½·¨Í¨³£ÓÐÁ½ÖÖ£ºÒ»ÊǸ÷¸öPipeʵÀý²¶»ñµ½Òì³£ºóµ÷ÓÃPipeContextʵÀýµÄhandleError½øÐдíÎó´¦Àí¡£ÁíÒ»¸öÊÇ´´½¨Ò»¸öרߺÔð´íÎÒ´¦ÀíµÄPipeʵÀý£¬ÆäËûPipeʵÀý²¶»ñÒì³£ºóÌá½»Ïà¹ØÊý¾Ý¸ø¸ÃPipeʵÀý´¦Àí¡£
4.¿ÉÅäÖõÄPipeline£ºPipelineģʽ¿ÉÒÔÓôúÂëµÄ·½Ê½½«Èô¸É¸öPipeʵÀýÌí¼Ó£¬¿ÉÒÔÓÃÅäÖÃÎļþµÄ·½Ê½ÊµÏÖ¶¯Ì¬·½Ê½Ìí¼ÓPipe¡£
|