日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Kafka詳解五、Kafka Consumer的底層API

 codingparty 2016-03-18
Kafka提供了兩套API給Consumer
  1. The high-level Consumer API
  2. The SimpleConsumer API     

第一種高度抽象的Consumer API,它使用起來簡單、方便,但是對于某些特殊的需求我們可能要用到第二種更底層的API,那么先介紹下第二種API能夠幫助我們做哪些事情

  • 一個(gè)消息讀取多次
  • 在一個(gè)處理過程中只消費(fèi)Partition其中的一部分消息
  • 添加事務(wù)管理機(jī)制以保證消息被處理且僅被處理一次

使用SimpleConsumer有哪些弊端呢?

  • 必須在程序中跟蹤offset值
  • 必須找出指定Topic Partition中的lead broker
  • 必須處理broker的變動

使用SimpleConsumer的步驟

  1. 從所有活躍的broker中找出哪個(gè)是指定Topic Partition中的leader broker
  2. 找出指定Topic Partition中的所有備份broker
  3. 構(gòu)造請求
  4. 發(fā)送請求查詢數(shù)據(jù)
  5. 處理leader broker變更
代碼實(shí)例:
package bonree.consumer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class SimpleExample {
  private List<String> m_replicaBrokers = new ArrayList<String>();

  public SimpleExample() {
    m_replicaBrokers = new ArrayList<String>();
  }

  public static void main(String args[]) {
    SimpleExample example = new SimpleExample();
    // 最大讀取消息數(shù)量
    long maxReads = Long.parseLong("3");
    // 要訂閱的topic
    String topic = "mytopic";
    // 要查找的分區(qū)
    int partition = Integer.parseInt("0");
    // broker節(jié)點(diǎn)的ip
    List<String> seeds = new ArrayList<String>();
    seeds.add("192.168.4.30");
    seeds.add("192.168.4.31");
    seeds.add("192.168.4.32");
    // 端口
    int port = Integer.parseInt("9092");
    try {
      example.run(maxReads, topic, partition, seeds, port);
    } catch (Exception e) {
      System.out.println("Oops:" + e);
      e.printStackTrace();
    }
  }

  public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
    // 獲取指定Topic partition的元數(shù)據(jù)
    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    if (metadata == null) {
      System.out.println("Can't find metadata for Topic and Partition. Exiting");
      return;
    }
    if (metadata.leader() == null) {
      System.out.println("Can't find Leader for Topic and Partition. Exiting");
      return;
    }
    String leadBroker = metadata.leader().host();
    String clientName = "Client_" + a_topic + "_" + a_partition;

    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    int numErrors = 0;
    while (a_maxReads > 0) {
      if (consumer == null) {
        consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
      }
      FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
      FetchResponse fetchResponse = consumer.fetch(req);

      if (fetchResponse.hasError()) {
        numErrors++;
        // Something went wrong!
        short code = fetchResponse.errorCode(a_topic, a_partition);
        System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
        if (numErrors > 5)
          break;
        if (code == ErrorMapping.OffsetOutOfRangeCode()) {
          // We asked for an invalid offset. For simple case ask for
          // the last element to reset
          readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
          continue;
        }
        consumer.close();
        consumer = null;
        leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
        continue;
      }
      numErrors = 0;

      long numRead = 0;
      for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
          System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
          continue;
        }

        readOffset = messageAndOffset.nextOffset();
        ByteBuffer payload = messageAndOffset.message().payload();

        byte[] bytes = new byte[payload.limit()];
        payload.get(bytes);
        System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
        numRead++;
        a_maxReads--;
      }

      if (numRead == 0) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException ie) {
        }
      }
    }
    if (consumer != null)
      consumer.close();
  }

  public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
      System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
      return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
  }

  /**
   * @param a_oldLeader
   * @param a_topic
   * @param a_partition
   * @param a_port
   * @return String
   * @throws Exception
   *             找一個(gè)leader broker
   */
  private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
    for (int i = 0; i < 3; i++) {
      boolean goToSleep = false;
      PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
      if (metadata == null) {
        goToSleep = true;
      } else if (metadata.leader() == null) {
        goToSleep = true;
      } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
        // first time through if the leader hasn't changed give
        // ZooKeeper a second to recover
        // second time, assume the broker did recover before failover,
        // or it was a non-Broker issue
        //
        goToSleep = true;
      } else {
        return metadata.leader().host();
      }
      if (goToSleep) {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException ie) {
        }
      }
    }
    System.out.println("Unable to find new leader after Broker failure. Exiting");
    throw new Exception("Unable to find new leader after Broker failure. Exiting");
  }

  private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
    PartitionMetadata returnMetaData = null;
    loop: for (String seed : a_seedBrokers) {
      SimpleConsumer consumer = null;
      try {
        consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
        List<String> topics = Collections.singletonList(a_topic);
        TopicMetadataRequest req = new TopicMetadataRequest(topics);
        kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

        List<TopicMetadata> metaData = resp.topicsMetadata();
        for (TopicMetadata item : metaData) {
          for (PartitionMetadata part : item.partitionsMetadata()) {
            if (part.partitionId() == a_partition) {
              returnMetaData = part;
              break loop;
            }
          }
        }
      } catch (Exception e) {
        System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
      } finally {
        if (consumer != null)
          consumer.close();
      }
    }
    if (returnMetaData != null) {
      m_replicaBrokers.clear();
      for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
        m_replicaBrokers.add(replica.host());
      }
    }
    return returnMetaData;
  }
}

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多