2015-05-03 9 views

risposta

6

Does commitOffsets sul blocco consumer di alto livello fino a quando gli offset non vengono eseguiti correttamente?

Sembra commitOffsets() loop attraverso ogni consumatore e chiede updatePersistentPath se il suo offset è cambiato, e se è così scrive dati via zkClient.writeData(path, getBytes(data)). Sembra che commitOffsets()compaia nel blocco fino a quando non vengono eseguiti tutti gli offset.

Ecco il codice sorgente per commitOffsets() (ref):

public void commitOffsets() { 
    if (zkClient == null) { 
     logger.error("zk client is null. Cannot commit offsets"); 
     return; 
    } 
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) { 
     ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey()); 
     for (PartitionTopicInfo info : e.getValue().values()) { 
      final long lastChanged = info.getConsumedOffsetChanged().get(); 
      if (lastChanged == 0) { 
       logger.trace("consume offset not changed"); 
       continue; 
      } 
      final long newOffset = info.getConsumedOffset(); 
      //path: /consumers/<group>/offsets/<topic>/<brokerid-partition> 
      final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName(); 
      try { 
       ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset); 
      } catch (Throwable t) { 
       logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t); 
      } finally { 
       info.resetComsumedOffsetChanged(lastChanged); 
       if (logger.isDebugEnabled()) { 
        logger.debug("Committed [" + path + "] for topic " + info); 
       } 
      } 
     } 
    } 
} 

e per updatePersistentPath(...) (ref):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) { 
    try { 
     zkClient.writeData(path, getBytes(data)); 
    } catch (ZkNoNodeException e) { 
     createParentPath(zkClient, path); 
     try { 
      zkClient.createPersistent(path, getBytes(data)); 
     } catch (ZkNodeExistsException e2) { 
      zkClient.writeData(path, getBytes(data)); 
     } 
    } 
} 
Problemi correlati