為了不遺忘和可以速查源碼,準備重新讀一遍Hadoop的MapReduce部分的源碼,記錄下來,盡量詳細點。如要轉(zhuǎn)載,請標明出處。
寫MapReduce程序首先接觸的是Job類,Job類是管理一個集群作業(yè)的類,包含了一個作業(yè)的所有信息和向集群提交作業(yè)的方法。

如圖所示,它有以上一些方法,我們寫程序是調(diào)用waitForCompletion()方法,方法實現(xiàn)如下:
- public boolean waitForCompletion(boolean verbose
- ) throws IOException, InterruptedException,
- ClassNotFoundException {
- if (state == JobState.DEFINE) {
- submit();
- }
- if (verbose) {
- jobClient.monitorAndPrintJob(conf, info);
- } else {
- info.waitForCompletion();
- }
- return isSuccessful();
- }
它調(diào)用了submit向集群提交作業(yè),下面看下submit()方法:
- public void submit() throws IOException, InterruptedException,
- ClassNotFoundException {
- ensureState(JobState.DEFINE);
- 建立新的API,檢查兼容性
- setUseNewAPI();
- info = jobClient.submitJobInternal(conf);
- state = JobState.RUNNING;
- }
jobClient是在初始化時候建立的。
- public Job(Configuration conf) throws IOException {
- super(conf, null);
- jobClient = new JobClient((JobConf) getConfiguration());
- }
JobClient類 建立了一個代理,用于連接JobTracker(集群上的master結(jié)點),
- public JobClient(JobConf conf) throws IOException {
- setConf(conf);
- init(conf);
- }
-
-
-
-
-
- public void init(JobConf conf) throws IOException {
- String tracker = conf.get("mapred.job.tracker", "local");
- if ("local".equals(tracker)) {
- this.jobSubmitClient = new LocalJobRunner(conf);
- } else {
- this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
- }
- }
這個代理會檢查mapred.job.tracker 這個屬性有沒有建立,默認值是local,如果建立了,則建立一個連接JobTracker的代理。這個代理負責上傳作業(yè)的配置和作業(yè)內(nèi)容到集群中。
- private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
- }
發(fā)現(xiàn)他實現(xiàn)了JobSubmissionProtocol接口的一個對象
- public static VersionedProtocol getProxy(Class<?> protocol,
- long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) throws IOException {
-
- VersionedProtocol proxy =
- (VersionedProtocol) Proxy.newProxyInstance(
- protocol.getClassLoader(), new Class[] { protocol },
- new Invoker(addr, ticket, conf, factory));
- long serverVersion = proxy.getProtocolVersion(protocol.getName(),
- clientVersion);
- if (serverVersion == clientVersion) {
- return proxy;
- } else {
- throw new VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
- }
- }
總之,Job類使用了一個實現(xiàn)了JobSubmissionProtocol接口的一個代理,這個代理對象可以用來和集群通信,job類的一些方法也可以用來幫助我們對集群和任務的進展情況進行查看。
|