日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Hadoop源碼之我見

 gerial 2011-11-23

為了不遺忘和可以速查源碼,準備重新讀一遍Hadoop的MapReduce部分的源碼,記錄下來,盡量詳細點。如要轉(zhuǎn)載,請標明出處。

 

寫MapReduce程序首先接觸的是Job類,Job類是管理一個集群作業(yè)的類,包含了一個作業(yè)的所有信息和向集群提交作業(yè)的方法。

 

 

如圖所示,它有以上一些方法,我們寫程序是調(diào)用waitForCompletion()方法,方法實現(xiàn)如下:

 

 

  1. public boolean waitForCompletion(boolean verbose  
  2.                                  ) throws IOException, InterruptedException,  
  3.                                           ClassNotFoundException {  
  4.   if (state == JobState.DEFINE) {  
  5.     submit();  
  6.   }  
  7.   if (verbose) {  
  8.     jobClient.monitorAndPrintJob(conf, info);  
  9.   } else {  
  10.     info.waitForCompletion();  
  11.   }  
  12.   return isSuccessful();  
  13. }  
 

 

 

它調(diào)用了submit向集群提交作業(yè),下面看下submit()方法:

 

 

  1. public void submit() throws IOException, InterruptedException,   
  2.                             ClassNotFoundException {  
  3.   ensureState(JobState.DEFINE);  
  4. 建立新的API,檢查兼容性   
  5.   setUseNewAPI();  
  6.   info = jobClient.submitJobInternal(conf);  
  7.   state = JobState.RUNNING;  
  8.  }  
 

 

jobClient是在初始化時候建立的。

 

  1. public Job(Configuration conf) throws IOException {  
  2.   super(conf, null);  
  3.   jobClient = new JobClient((JobConf) getConfiguration());  
  4. }  
 

 

JobClient類 建立了一個代理,用于連接JobTracker(集群上的master結(jié)點),

 

  1. public JobClient(JobConf conf) throws IOException {  
  2.   setConf(conf);  
  3.   init(conf);  
  4. }  
  5. /** 
  6.  * Connect to the default {@link JobTracker}. 
  7.  * @param conf the job configuration. 
  8.  * @throws IOException 
  9.  */  
  10. public void init(JobConf conf) throws IOException {  
  11.   String tracker = conf.get("mapred.job.tracker""local");  
  12.   if ("local".equals(tracker)) {  
  13.     this.jobSubmitClient = new LocalJobRunner(conf);  
  14.   } else {  
  15.     this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);  
  16.   }          
  17. }  
 

 

這個代理會檢查mapred.job.tracker 這個屬性有沒有建立,默認值是local,如果建立了,則建立一個連接JobTracker的代理。這個代理負責上傳作業(yè)的配置和作業(yè)內(nèi)容到集群中。

 

  1. private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,  
  2.     Configuration conf) throws IOException {  
  3.   return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,  
  4.       JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,  
  5.       NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));  
  6. }  
 

發(fā)現(xiàn)他實現(xiàn)了JobSubmissionProtocol接口的一個對象

  1. public static VersionedProtocol getProxy(Class<?> protocol,  
  2.     long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,  
  3.     Configuration conf, SocketFactory factory) throws IOException {      
  4.       
  5.   VersionedProtocol proxy =  
  6.       (VersionedProtocol) Proxy.newProxyInstance(  
  7.           protocol.getClassLoader(), new Class[] { protocol },  
  8.           new Invoker(addr, ticket, conf, factory));  
  9.   long serverVersion = proxy.getProtocolVersion(protocol.getName(),   
  10.                                                 clientVersion);  
  11.   if (serverVersion == clientVersion) {  
  12.     return proxy;  
  13.   } else {  
  14.     throw new VersionMismatch(protocol.getName(), clientVersion,   
  15.                               serverVersion);  
  16.   }  
  17. }  
 

 

 

總之,Job類使用了一個實現(xiàn)了JobSubmissionProtocol接口的一個代理,這個代理對象可以用來和集群通信,job類的一些方法也可以用來幫助我們對集群和任務的進展情況進行查看。

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多