学习一下InMemoryCompaction优化。
读取流程
- 活跃段
(MutableSegment)以跳表方式扫描 CellArrayImmutableSegment以二分查找方式扫描CSLMImmutableSegment以跳表方式扫描
数据的流转过程大概如下
MutableSegment → ImmutableSegment(s) → HFile
流程
getScanner
由于都是类LSM架构,因此方法类似,这里定义了一个类似Iterator的Scanner。
public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
throws IOException {
storeEngine.readLock();
try {
ScanInfo scanInfo;
if (this.getCoprocessorHost() != null) {
scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
} else {
scanInfo = getScanInfo();
}
return createScanner(scan, scanInfo, targetCols, readPt);
} finally {
storeEngine.readUnlock();
}
}传入的参数scan和targetCols和readPt
scanprivate byte[] startRow = HConstants.EMPTY_START_ROW;// 起始行private boolean includeStartRow = true;// 是否包含起始行private byte[] stopRow = HConstants.EMPTY_END_ROW;// 结束行private boolean includeStopRow = false;// 是否包含结束行private int maxVersions = 1;// 最大版本数private int batch = -1;// 批处理大小private boolean allowPartialResults = false;// 是否允许部分结果private int storeLimit = -1;// 每个列族返回的最大结果数private int storeOffset = 0;// 每个列族的行偏移量private int caching = -1;// 扫描器缓存大小private long maxResultSize = -1;// 最大结果大小private boolean cacheBlocks = true;// 是否缓存块private boolean reversed = false;// 是否逆序扫描private TimeRange tr = TimeRange.allTime();// 时间范围private Map<byte[], NavigableSet<byte[]>> familyMap;// 列族和列的映射private int limit = -1;// 行数限制private ReadType readType = ReadType.DEFAULT;// 读取类型private boolean needCursorResult = false;// 是否需要游标结果
long readPt:MVCC(多版本并发控制)读取点,是数据一致性的关键参数 首先需要加一个读锁,保证存储引擎被锁住,结构不发生变化。
然后是创建一个新的scanner
protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> targetCols, long readPt) throws IOException {
return scan.isReversed()
? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
: new StoreScanner(this, scanInfo, scan, targetCols, readPt);
}HStore
StoreScanner是对于HStore的Scanner,介于RegionScanner和KeyValueScanner之上,对应单个cf的数据。- 合并多个
Memstore和HFile的数据,保存TTL规则,确定返回一致性视图的数据。
public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),
ScanType.USER_SCAN);
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
}
matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,
store.getCoprocessorHost());
store.addChangedReaderObserver(this);
List<KeyValueScanner> scanners = null;
try {
// Pass columns to try to filter out unnecessary StoreFiles.
scanners = selectScannersFrom(store,
store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,
isOnlyLatestVersionScan(scan)));
// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
parallelSeekEnabled);
// set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily();
// set rowOffset
this.storeOffset = scan.getRowOffsetPerColumnFamily();
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, comparator);
} catch (IOException e) {
clearAndClose(scanners);
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
// and might cause memory leak
store.deleteChangedReaderObserver(this);
throw e;
}
}核心在于获取scanners的过程,先跳转到store.getScanners(...)
public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)
throws IOException {
Collection<HStoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
this.storeEngine.readLock();
try {
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow, onlyLatestVersion);
memStoreScanners = this.memstore.getScanners(readPt);
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
// for more details.
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.storeEngine.readUnlock();
}
try {
// First the store file scanners
// TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);
List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;
} catch (Throwable t) {
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}可以发现首先获取内存和磁盘的存储
storeFilesToScan:获取HFile对应的文件。memStoreScanners:根据传入的视图获取内存的scanner。
先看memStore,可以发现有两种获取的方式,一种是InMemoryCompaction,这里先看这个
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = getActive();
List<? extends Segment> pipelineList = pipeline.getSegments();
List<? extends Segment> snapshotList = snapshot.getAllSegments();
long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
// The list of elements in pipeline + the active element + the snapshot segment
List<KeyValueScanner> list = createList((int) numberOfSegments);
addToScanners(activeTmp, readPt, list);
addToScanners(pipelineList, readPt, list);
addToScanners(snapshotList, readPt, list);
return list;
}可以发现获取了3个Scanner,分别是
- 活跃段:类似
memtable。 - 管道段:压缩管道内的不可变的段,类似
immemtable。 - 快照段:准备刷到盘的段集合。
活跃段
可以发现最终获得的active是一个MutableSegment,主要包含以下
cellSet:存储实际数据的容器,通常是ConcurrentSkipListMap。comparator:单元格比较器,决定数据排序方式。memStoreLAB:内存分配缓冲区,优化小对象内存分配。
其实就是一个memtable。
管道段
这里就是处理合并操作的管道。可以发现实际是返回了readOnlyCopy,是一个LinkedList<ImmutableSegment>类型。
对于如何处理readOnlyCopy先搁置,而讨论ImmutableSegment是什么,是怎么读取的。
Snapshot
快照。
现在来看是怎么从Segment这种数据结构里面读取内容的
public static void addToScanners(List<? extends Segment> segments, long readPt,
List<KeyValueScanner> scanners) {
for (Segment item : segments) {
addToScanners(item, readPt, scanners);
}
}
protected static void addToScanners(Segment segment, long readPt,
List<KeyValueScanner> scanners) {
if (!segment.isEmpty()) {
scanners.add(segment.getScanner(readPt));
}
}实际上是调用了addToScanners把一个Segment放到scanners里面。
scanners是一个List<KeyValueScanner>类型。
实际上KeyValueScanner也是一个虚基类,因此查看对应的实现,这里先看SegmentScanner,由于比较长,就不全部黏贴了,具体可以发现有以下几个API
- public ExtendedCell peek() // 查看当前Cell但不移动指针
- public ExtendedCell next() // 获取当前Cell并移动到下一个
- public boolean seek(ExtendedCell cell) // 定位到指定Cell或之后位置
- public boolean reseek(ExtendedCell cell) // 重新定位,优化的seek版本
- protected Iterator
getIterator(ExtendedCell cell) // 获取从指定Cell开始的迭代器 - protected void updateCurrent() // 更新当前Cell,跳过MVCC不相关的版本
列举了以下几个API,可以发现实际上就是一个Iterator,可以seek到某个key。
先看一下seek是怎么实现的
@Override
public boolean seek(ExtendedCell cell) throws IOException {
if (closed) {
return false;
}
if (cell == null) {
close();
return false;
}
// restart the iterator from new key
iter = getIterator(cell);
// last is going to be reinitialized in the next getNext() call
last = null;
updateCurrent();
return (current != null);
}首先看一下getIterator(cell),显然是新建一个Iterator指向这个Cell
protected Iterator<ExtendedCell> getIterator(ExtendedCell cell) {
return segment.tailSet(cell).iterator();
}实际上底层调用了CompositeImmutableSegment里的tailSet
待补充然后看一下updateCurrent
protected void updateCurrent() {
ExtendedCell next = null;
try {
while (iter.hasNext()) {
next = iter.next();
if (next.getSequenceId() <= this.readPoint) {
current = next;
return;// skip irrelevant versions
}
// for backwardSeek() stay in the boundaries of a single row
if (stopSkippingKVsIfNextRow && segment.compareRows(next, stopSkippingKVsRow) > 0) {
current = null;
return;
}
} // end of while
current = null; // nothing found
} finally {
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for reseek()
last = next;
}
}
}首先是判断视图是否合法,如果当前sequenceId比readPoint小,那么表示是合理的视图,直接返回。
可以发现整个seek的流程就是先二分到对应的Cell,然后根据MVCC给的视图找到合理的第一个Cell。
CellArrayImmutableSegment
这里看一下数组形式的ImmutableSegment,对于InMemoryCompaction来说,实际上是把将写优化的CSLMImmutableSegment(跳表结构)转换为读优化的CellArrayImmutableSegment(数组结构)。
InMemoryCompaction
核心代码如下
void inMemoryCompaction() {
// setting the inMemoryCompactionInProgress flag again for the case this method is invoked
// directly (only in tests) in the common path setting from true to true is idempotent
inMemoryCompactionInProgress.set(true);
// Used by tests
if (!allowCompaction.get()) {
return;
}
try {
// Speculative compaction execution, may be interrupted if flush is forced while
// compaction is in progress
if (!compactor.start()) {
setInMemoryCompactionCompleted();
}
} catch (IOException e) {
LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
}
}
public boolean start() throws IOException {
if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
return false;
}
// get a snapshot of the list of the segments from the pipeline,
// this local copy of the list is marked with specific version
versionedList = compactingMemStore.getImmutableSegments();
LOG.trace("Speculative compaction starting on {}/{}",
compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(),
compactingMemStore.getStore().getColumnFamilyName());
HStore store = compactingMemStore.getStore();
RegionCoprocessorHost cpHost = store.getCoprocessorHost();
if (cpHost != null) {
cpHost.preMemStoreCompaction(store);
}
try {
doCompaction();
} finally {
if (cpHost != null) {
cpHost.postMemStoreCompaction(store);
}
}
return true;
}实际上核心调用了doCompaction
private void doCompaction() {
ImmutableSegment result = null;
boolean resultSwapped = false;
MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE
|| nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
try {
if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
return; // the compaction also doesn't start when interrupted
}
if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
return;
}
if (
nextStep == MemStoreCompactionStrategy.Action.FLATTEN
|| nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS
) {
// some Segment in the pipeline is with SkipList index, make it flat
compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
return;
}
// Create one segment representing all segments in the compaction pipeline,
// either by compaction or by merge
if (!isInterrupted.get()) {
result = createSubstitution(nextStep);
}
// Substitute the pipeline with one segment
if (!isInterrupted.get()) {
resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
if (resultSwapped) {
// update compaction strategy
strategy.updateStats(result);
// update the wal so it can be truncated and not get too long
compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
}
}
} catch (IOException e) {
LOG.trace("Interrupting in-memory compaction for store={}",
compactingMemStore.getFamilyName());
Thread.currentThread().interrupt();
} finally {
// For the MERGE case, if the result was created, but swap didn't happen,
// we DON'T need to close the result segment (meaning its MSLAB)!
// Because closing the result segment means closing the chunks of all segments
// in the compaction pipeline, which still have ongoing scans.
if (!merge && (result != null) && !resultSwapped) {
result.close();
}
releaseResources();
compactingMemStore.setInMemoryCompactionCompleted();
}
}可以发现Compaction分为四种
NOOP:啥都不干直接返回。FLATTEN:把一个基于跳表的Segment换成基于数组的Segment。COMPACT:把当前pipeline里的数据全部合并。MERGE:合并多个段但不进行数据处理,只是结构合并。
FLATTEN
核心函数是flattenOneSegment,这里用了一个乐观并发控制,在锁外和锁内分别检查一下version,如果不符合要求则直接返回。
然后是枚举pipeline中所有的Segment,看是否支持flatten,如果支持,则根据当前Segment创建一个数组类型的Segment。实际上只有基于跳表的Segment可以被flatten。
public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType,
MemStoreCompactionStrategy.Action action) {
if (requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
+ requesterVersion + ", actual version: " + version);
return false;
}
synchronized (pipeline) {
if (requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
}
int i = -1;
for (ImmutableSegment s : pipeline) {
i++;
if (s.canBeFlattened()) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
if (s.isEmpty()) {
// after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
// we can skip it.
continue;
}
// size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment) s, idxType, newMemstoreAccounting, action);
replaceAtIndex(i, newS);
if (region != null) {
// Update the global memstore size counter upon flattening there is no change in the
// data size
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
mss.getCellsCount());
}
LOG.debug("Compaction pipeline segment {} flattened", s);
return true;
}
}
}
// do not update the global memstore size counter and do not increase the version,
// because all the cells remain in place
return false;
}COMPACT
可以发现实际上是以下函数实现的,包括MERGE。
首先是获得当前所有Segment的集合,然后创建iterator
private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action)
throws IOException {
ImmutableSegment result = null;
MemStoreSegmentsIterator iterator = null;
List<ImmutableSegment> segments = versionedList.getStoreSegments();
for (ImmutableSegment s : segments) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed.
// we skip empty segment when create MemStoreSegmentsIterator following.
}
switch (action) {
case COMPACT:
iterator = new MemStoreCompactorSegmentsIterator(segments,
compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore());
result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
iterator.close();
break;
case MERGE:
case MERGE_COUNT_UNIQUE_KEYS:
iterator = new MemStoreMergerSegmentsIterator(segments, compactingMemStore.getComparator(),
compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
iterator.close();
break;
default:
throw new RuntimeException("Unknown action " + action); // sanity check
}
return result;
}先看创立iterator的过程,可以发现首先搞了一个scanners,然后调用refillKVS
public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
CellComparator comparator, int compactionKVMax, HStore store) throws IOException {
super(compactionKVMax);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, scanners);
// build the scanner based on Query Matcher
// reinitialize the compacting scanner for each instance of iterator
compactingScanner = createScanner(store, scanners);
refillKVS();
}然后看refillKVS,可以发现实际上的操作是把之前创建好的Scanner里的数据全部放到kvs里
private boolean refillKVS() {
// if there is nothing expected next in compactingScanner
if (!hasMore) {
return false;
}
// clear previous KVS, first initiated in the constructor
kvs.clear();
for (;;) {
try {
// InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but
// all the server side implementation should only add ExtendedCell to the List, otherwise it
// will cause serious assertions in our code
hasMore = compactingScanner.next(kvs, scannerContext);
} catch (IOException e) {
// should not happen as all data are in memory
throw new IllegalStateException(e);
}
if (!kvs.isEmpty()) {
kvsIterator = kvs.iterator();
return true;
} else if (!hasMore) {
return false;
}
}
}填充完之后开始准备Compaction,原理实际上很简单,这里封装的很好,实际上是将之前创立好的iterator传入createImmutableSegment里,新建一个ImmutableSegment
public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
CompactingMemStore.IndexType idxType, MemStoreCompactionStrategy.Action action)
throws IOException {
MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
return createImmutableSegment(conf, comparator, iterator, memStoreLAB, numOfCells, action,
idxType);
}看如何新建一个Segment,这里关注CellArrayImmutableSegment是如何实现的
private ImmutableSegment createImmutableSegment(final Configuration conf,
final CellComparator comparator, MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB,
int numOfCells, MemStoreCompactionStrategy.Action action,
CompactingMemStore.IndexType idxType) {
ImmutableSegment res = null;
switch (idxType) {
case CHUNK_MAP:
res = new CellChunkImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
break;
case CSLM_MAP:
assert false; // non-flat segment can not be created here
break;
case ARRAY_MAP:
res = new CellArrayImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
break;
}
return res;
}调用构造函数
protected CellArrayImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
MemStoreLAB memStoreLAB, int numOfCells, MemStoreCompactionStrategy.Action action) {
super(null, comparator, memStoreLAB); // initiailize the CellSet with NULL
incMemStoreSize(0, DEEP_OVERHEAD_CAM, 0, 0); // CAM is always on-heap
// build the new CellSet based on CellArrayMap and update the CellSet of the new Segment
initializeCellSet(numOfCells, iterator, action);
}新建完之后,自然希望把这个ImmutableSegment替换当前的ImmutableSegment,拿着给的result进行填充
if (!isInterrupted.get()) {
result = createSubstitution(nextStep);
}
// Substitute the pipeline with one segment
if (!isInterrupted.get()) {
resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
if (resultSwapped) {
// update compaction strategy
strategy.updateStats(result);
// update the wal so it can be truncated and not get too long
compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
}
}将pipeline里的Segment进行替换
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
// last true stands for updating the region size
return pipeline.swap(versionedList, result, !merge, true);
}然后是swap。
首先还是进行乐观并发控制,即version检查,锁,version检查。然后获取当前pipeline里存储的Segment,通过swapSuffix进行替换,替换完后verison++,同时更新readOnlyCopy。
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "Increment is done under a synchronize block so safe")
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
boolean closeSuffix, boolean updateRegionSize) {
if (versionedList.getVersion() != version) {
return false;
}
List<ImmutableSegment> suffix;
synchronized (pipeline) {
if (versionedList.getVersion() != version) {
return false;
}
suffix = versionedList.getStoreSegments();
LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
versionedList.getStoreSegments().size(), segment);
swapSuffix(suffix, segment, closeSuffix);
readOnlyCopy = new LinkedList<>(pipeline);
version++;
}
if (updateRegionSize && region != null) {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
long suffixHeapSize = getSegmentsHeapSize(suffix);
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
int suffixCellsCount = getSegmentsCellsCount(suffix);
long newDataSize = 0;
long newHeapSize = 0;
long newOffHeapSize = 0;
int newCellsCount = 0;
if (segment != null) {
newDataSize = segment.getDataSize();
newHeapSize = segment.getHeapSize();
newOffHeapSize = segment.getOffHeapSize();
newCellsCount = segment.getCellsCount();
}
long dataSizeDelta = suffixDataSize - newDataSize;
long heapSizeDelta = suffixHeapSize - newHeapSize;
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
int cellsCountDelta = suffixCellsCount - newCellsCount;
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
LOG.debug(
"Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
+ "size={} suffix off heap size={}, new segment off heap size={}, suffix cells "
+ "count={}, new segment cells count={}",
suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
suffixCellsCount, newCellsCount);
}
return true;
}然后看swapSuffix,发现实际上是两步,第一步移除pipeline里对应的Segment,然后把当前Segment放入pipeline。
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
matchAndRemoveSuffixFromPipeline(suffix);
if (segment != null) {
pipeline.addLast(segment);
}
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
// compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
// created for the result segment. So we can release the chunks associated with the compacted
// segments.
if (closeSegmentsInSuffix) {
for (Segment itemInSuffix : suffix) {
itemInSuffix.close();
}
}
}主要看matchAndRemoveSuffixFromPipeline,从链表尾部开始枚举,向前一个一个对比是否和要删除的链表元素相同,如果不同则抛异常,否则最后把链表尾部一个一个摘除。
private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
if (suffix.isEmpty()) {
return;
}
if (pipeline.size() < suffix.size()) {
throw new IllegalStateException(
"CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
+ "],pipeline size must greater than or equals suffix size");
}
ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
int count = 0;
while (suffixIterator.hasPrevious()) {
Segment suffixSegment = suffixIterator.previous();
Segment pipelineSegment = pipelineIterator.previous();
if (suffixSegment != pipelineSegment) {
throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
+ " is not pipleline segment:[" + pipelineSegment + "]");
}
count++;
}
for (int index = 1; index <= count; index++) {
pipeline.pollLast();
}
}