剑客
关注科技互联网

Cassandra源码分析-存储引擎

Cassandra-2.2/3.0 源码分析:存储引擎

DecoratedKey、Token

对字节形式的key进行修饰后的DecoratedKey会用在很多地方,比如读写:

StorageService.getPartitioner()获取唯一的Partitioner。注意一个集群只允许配置一个Partitioner,

不允许配置多个Partitioner,否则会有冲突。而且服务端配置的Partitioner,客户端也必须使用相同的Partitioner,

如果说服务器使用Murmur3,而客户端使用Random,客户端启动时会报错。

//Read
public Row getRow(Keyspace keyspace) {
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}

Cassandra源码分析-存储引擎

Murmur3Partitioner对key进行装饰后,最终得到某个Token,这个Token是无状态的数据,所以新的key会创建新的Token对象。

public class Murmur3Partitioner implements IPartitioner
public static final Murmur3Partitioner instance = new Murmur3Partitioner();


public DecoratedKey decorateKey(ByteBuffer key) {
long[] hash = getHash(key);
return new PreHashedDecoratedKey(getToken(key, hash), key, hash[0], hash[1]);
}
private LongToken getToken(ByteBuffer key, long[] hash) {
return new LongToken(normalize(hash[0]));
}

public static class LongToken extends Token {
final long token;
public LongToken(long token) { this.token = token; }

public IPartitioner getPartitioner() { return instance; }
public Object getTokenValue() { return token; }
}
}

public abstract class Token implements RingPosition<Token>, Serializable {
public static final TokenSerializer serializer = new TokenSerializer();
public static abstract class TokenFactory {
public abstract ByteBuffer toByteArray(Token token);
public abstract Token fromByteArray(ByteBuffer bytes);
public abstract String toString(Token token); // serialize as string, not necessarily human-readable
public abstract Token fromString(String string); // deserialize
}
abstract public IPartitioner getPartitioner();
abstract public Object getTokenValue();
}

DecoratedKey抽象类包括了key内容本身和Token,实现类有内存的BufferDecoratedKey和native的NativeDecoratedKey。

可见在支持native对象时,最底层的key对象已经开始用native方式分配内存了

public abstract class DecoratedKey implements RowPosition, FilterKey {
private final Token token;
public Token getToken() { return token; }
public abstract ByteBuffer getKey();
}

最底层的接口其实还不是DecoratedKey,而是RingPosition

Cassandra源码分析-存储引擎

Token是对key进行hash得到的一个数值,因此可能产生hash冲突,即同一个hash值可能有多个key对应。

所以Key和Token不是一一对应的,根据key可以得到唯一的Token,但是根据Token不一定有唯一的key。

Native

public class NativeDecoratedKey extends DecoratedKey {
final long peer;

public NativeDecoratedKey(Token token, NativeAllocator allocator, OpOrder.Group writeOp, ByteBuffer key) {
super(token);
int size = key.remaining();
this.peer = allocator.allocate(4 + size, writeOp);
MemoryUtil.setInt(peer, size);
MemoryUtil.setBytes(peer + 4, key);
}
public ByteBuffer getKey() {
return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer));
}
}
public class BufferDecoratedKey extends DecoratedKey {
private final ByteBuffer key;

public BufferDecoratedKey(Token token, ByteBuffer key) {
super(token);
this.key = key;
}
public ByteBuffer getKey() { return key; }
}

http://normanmaurer.me/blog/2013/10/28/Lesser-known-concurrent-classes-Part-1/

Cassandra源码分析-存储引擎

左图使用Atomic,右图使用volatile(总共占用500M)和AtomicUpdater(136M)

TokenMetadata

DataModel

OnDiskAtom:在盘原子(OnDisk + Atom原子,磁盘上的原子变量),有两个实现类:RangeTombstone和Cell,

Cell也有多种接口:AbstractCell、CounterCell、ExpiringCell、DeletedCell。

这里已经把删除相关的几种实现都覆盖了:TTL为ExpiringCell,删除命令为DeletedCell,删除多个为RangeTombstone。

普通操作的抽象类是AbstractCell,有两种大类实现:BufferCell和AbstractNativeCell,分别代表内存和Offheap中的Cell。

Cell有多种实现,除了几种删除相关的Cell外,普通Cell又分为BufferCell和NativeCell。

其中BufferCell在内存中,而NativeCell在OffHeap中。

Cassandra源码分析-存储引擎

public class Row {
public final DecoratedKey key;
public final ColumnFamily cf;
}
public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry {
protected final CFMetaData metadata;
}

public class ArrayBackedSortedColumns extends ColumnFamily {
private DeletionInfo deletionInfo;
private Cell[] cells;
private int size;
private int sortedSize;
}

public interface Cell extends OnDiskAtom {
public CellName name();
public ByteBuffer value();
}
public class BufferCell extends AbstractCell {
protected final CellName name; //Cell名称
protected final ByteBuffer value; //Cell值
protected final long timestamp; //时间撮,每个Cell都有一个时间撮,用来防止冲突
}

public class Keyspace {
private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
private volatile KSMetaData metadata;
}

Keyspace和ColumnFamily的定义都有元数据,分别用KSMetaData和CFMetadata表示ks和表级别的配置信息。

数据库:Keyspace

表:ColumnFamily

主键:DecoratedKey

行:Row

列:Cell

Row由Key和ColumnFamily组成,即主键和列族(很多个列)。ColumnFamily是Column的Family,Column也叫做Cell。

ColumnFamily可以用一个双层Map表示:
Map<RowKey, SortedMap<ColumnKey, ColumnValue>>

因为是Map结构,所以查询Map中的指定key非常快,列是有序存储的,所以扫描多个列或者定位某个列也很高效。

Cassandra源码分析-存储引擎

比较类型

CREATE TABLE velocity_app (
attribute text,
partner_code text,
app_name text,
type text,
"timestamp" bigint,
event text,
sequence_id text,
PRIMARY KEY ((attribute, partner_code, app_name, type), sequence_id)
) WITH CLUSTERING ORDER BY (sequence_id DESC);

insert into velocity_app(attribute, partner_code, app_name, type,timestamp,event,sequence_id)values('zqhxuyuan','tongdun','tongdun_app','login',1111111111,'{jsondata}','1111111111-1');
select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app';


cqlsh:system> select * from schema_columnfamilies where keyspace_name='forseti' and columnfamily_name='velocity_app';

keyspace_name | columnfamily_name | bloom_filter_fp_chance | caching | cf_id | comment | compaction_strategy_class | compaction_strategy_options | comparator | compression_parameters | default_time_to_live | default_validator | dropped_columns | gc_grace_seconds | is_dense | key_validator | local_read_repair_chance | max_compaction_threshold | max_index_interval | memtable_flush_period_in_ms | min_compaction_threshold | min_index_interval | read_repair_chance | speculative_retry | subcomparator | type
---------------+-------------------+------------------------+---------------------------------------------+--------------------------------------+---------+-----------------------------------------------------------------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------+----------------------+-------------------------------------------+-----------------+------------------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+--------------------+-----------------------------+--------------------------+--------------------+--------------------+-------------------+---------------+----------
forseti | velocity_app | 0.01 | {"keys":"ALL", "rows_per_partition":"NONE"} | 763248f0-8f88-11e6-a6b6-71d72bc0ba41 | | org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy | {} | org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type),org.apache.cassandra.db.marshal.UTF8Type) | {"sstable_compression":"org.apache.cassandra.io.compress.LZ4Compressor"} | 0 | org.apache.cassandra.db.marshal.BytesType | null | 864000 | False | org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type) | 0.1 | 32 | 2048 | 0 | 4 | 128 | 0 | 99.0PERCENTILE | null | Standard

(1 rows)

cqlsh:system> select * FROM schema_columns where keyspace_name='forseti' and columnfamily_name='velocity_app';

keyspace_name | columnfamily_name | column_name | component_index | index_name | index_options | index_type | type | validator
---------------+-------------------+--------------+-----------------+------------+---------------+------------+----------------+----------------------------------------------------------------------------------------
forseti | velocity_app | app_name | 2 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | attribute | 0 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | event | 1 | null | null | null | regular | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | partner_code | 1 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | sequence_id | 0 | null | null | null | clustering_key | org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type)
forseti | velocity_app | timestamp | 1 | null | null | null | regular | org.apache.cassandra.db.marshal.LongType
forseti | velocity_app | type | 3 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type

(7 rows)

比较器 类型
comparator CompositeType(ReversedType(UTF8Type),UTF8Type)
default_validator BytesType
key_validator CompositeType(UTF8Type,UTF8Type,UTF8Type,UTF8Type)

CQLSSTableWriter

String schema = "CREATE TABLE myKs.myTable (k int PRIMARY KEY,v1 text,v2 int)";
String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory("path/to/directory")
.forTable(schema)
.using(insert).build();
writer.addRow(0, "test1", 24);
writer.addRow(1, "test2", null);
writer.addRow(2, "test3", 42);
writer.close();

SSTableWriter

SSTable构建在SequenceFile上,它在磁盘的数据存储是有序的,SSTable包括数据文件和索引文件。除此之外,为了加快文件的读取,

还有BloomFilter、IndexSummary,注意索引文件会存储每个key在数据文件中的索引位置,而IndexSummary文件则存储部分key,

每隔一定key的数量才在summary文件中存储一个条目。通常summary文件比较小,所以可以直接以MMap的形式映射到内存中。

SSTable分成SSTableWriter和SSTableReader,具体的文件操作接口实现是:BigTableWriter和BigTableReader。

SSTable
|-- SSTableWriter
|-- BigTableWriter
|-- SSTableReader
|-- BigTableReader

BigTableWriter.append() 应该是实际的写入一行记录方法,看看调用链:当Memtable刷写时,会把内存中有序的数据追加到BigTableWriter。

BigTableWriter.append(DecoratedKey, ColumnFamily)
|-- Memtable.writeSortedContents(File)
|-- Memtable.flush()

append方法第一个参数DecoratedKey表示row key,那么第二个参数ColumnFamily表示的是这个key对应的所有Column家族。

ColumnFamily是Column数据的集合,Column包括ColumnName和ColumnValue,有了row key,column name,column value,数据也就准备完毕。

IndexWriter

BigTableWriter针对索引文件和数据文件的写入分别是:IndexWriter和SequentialWriter。后者负责data文件,

而前者除了Index文件,还有BloomFilter文件、Summary文件都一起完成。

public class BigTableWriter extends SSTableWriter {
private final IndexWriter iwriter; //勤劳的Index,还要负责BF、IndexSummary
private final SequentialWriter dataFile; //孤傲的数据文件

public void append(DecoratedKey decoratedKey, ColumnFamily cf) {
long startPosition = beforeAppend(decoratedKey); //写文件,要知道开始位置
RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); //返回索引条目
long endPosition = dataFile.getFilePointer();
afterAppend(decoratedKey, endPosition, entry);
}
private long beforeAppend(DecoratedKey decoratedKey) {
return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
}
private void afterAppend(DecoratedKey decoratedKey, long dataEnd, RowIndexEntry index) {
lastWrittenKey = decoratedKey;
if (first == null) first = lastWrittenKey;
iwriter.append(decoratedKey, index, dataEnd); //索引文件,以及其他BF、IndexSummary都在这里完成
dbuilder.addPotentialBoundary(dataEnd);
}
}

先来看IndexWriter怎么写入索引文件以及BF、IndexSummary等。

class IndexWriter extends AbstractTransactional implements Transactional {
private final SequentialWriter indexFile; //Index文件
public final SegmentedFile.Builder builder;
public final IndexSummaryBuilder summary; //IndexSummary文件
public final IFilter bf; //Bloom Filter文件

public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd) throws IOException {
bf.add(key); //添加到Bloom Filter中,BF类似于一个List集合
long indexStart = indexFile.getFilePointer();
ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream); //序列化到IndexFile文件中
long indexEnd = indexFile.getFilePointer();
summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd); //也许需要追加索引条目到Summary文件中
builder.addPotentialBoundary(indexStart);
}
}

Write Data

key和ColumnFamily已经足够可以代表要写入的数据了。ColumnIndex字面意思是Column索引,为什么要在列上加索引,

因为Cassandra是宽表,一行数据可能有很多列(最多2亿个列,可想而知,对列做索引也可以提高性能)。

这里的out参数是dataFile的输出流,所以接下来的文件写入都是会写到Data数据文件中的。

private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) {
ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
ColumnIndex index = builder.build(cf); //这里会由dataFile写文件内容
out.writeShort(END_OF_ROW); //一行数据的结束标记位
return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); //返回RowIndex
}

IndexInfo的组成:Composite lastName、firsetName(Block索引块的第一个列名和最后一个列名),offset、width(索引块的偏移量和长度)。

一个ColumnIndex包括了多个IndexInfo,因此ColumnIndex表示的是所有Column组成在一起的最终索引结果。

Cassandra源码分析-存储引擎

public class ColumnIndex {
public final List<IndexHelper.IndexInfo> columnsIndex;

// Help to create an index for a column family based on size of columns, and write said columns to disk.
public static class Builder {
private final ColumnIndex result;
private final DataOutputPlus output;
private final ByteBuffer key;

public ColumnIndex build(ColumnFamily cf){
for(Cell c : cf) add(c); //不考虑Tombstone等,非常简单!
ColumnIndex index = build();
return index;
}
}
}

ColumnIndex实际上并不是单个Column,或者说仅仅表示一个Column的Index,它表示的真正含义是一行的所有Column。

一行记录有很多Column,这些Column会每隔blockSize生成一个IndexInfo关于列的索引条目。

Cassandra源码分析-存储引擎

public void add(OnDiskAtom column) throws IOException {
if (firstColumn == null) { //第一列,在一行中只会执行一次。。错误!因为一行有多个Block, 每个Block都会执行一次
firstColumn = column;
startPosition = endPosition;
endPosition += tombstoneTracker.writeOpenedMarkers(firstColumn.name(), output, atomSerializer);
blockSize = 0;
maybeWriteRowHeader(); //第一个列,需要添加RowHeader
}
if (tombstoneTracker.update(column, false)) {
long size = tombstoneTracker.writeUnwrittenTombstones(output, atomSerializer);
size += atomSerializer.serializedSizeForSSTable(column);
endPosition += size;
blockSize += size; //增加block大小,最后才可以判断是否需要添加列索引
atomSerializer.serializeForSSTable(column, output);
}
lastColumn = column; //最近一个列

// if we hit the column index size that we have to index after, go ahead and index it.
if (blockSize >= DatabaseDescriptor.getColumnIndexSize()) { //间隔blockSize,添加一个列索引
IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(
firstColumn.name(), column.name(), indexOffset + startPosition, endPosition - startPosition);
result.columnsIndex.add(cIndexInfo);
firstColumn = null; //重置firstColumn为null,这样下一个Block会重新执行开头的if条件
lastBlockClosing = column;
}
}

最后build的时候,第一个if条件表示在add过程中都没产生IndexInfo,第二个条件是说即使add过程有IndexInfo,

可能剩余的Column不够一个完整的Block,也要新建一个IndexInfo不足以产生一个完整的Block。

public ColumnIndex build() {
if (result.columnsIndex.isEmpty() || lastBlockClosing != lastColumn) {
IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(
firstColumn.name(), lastColumn.name(), indexOffset + startPosition, endPosition - startPosition);
result.columnsIndex.add(cIndexInfo);
}
return result;
}

ColumnIndex会被用于创建RowIndexEntry,如果索引块(IndexInfo)只有一个,则直接创建RowIndexEntry(不需要把IndexInfo传进去),

否则创建IndexedEntry,会把ColumnIndex的所有IndexInfo都传入。

Cassandra源码分析-存储引擎

position其实就是Row的起始位置,知道了起始位置,就可以构建row key索引的条目了。后续的IndexWriter我们已经分析过了。

public static RowIndexEntry<IndexHelper.IndexInfo> create(long position, DeletionTime deletionTime, ColumnIndex index) {
if (index.columnsIndex.size() > 1)
return new IndexedEntry(position, deletionTime, index.columnsIndex);
else
return new RowIndexEntry<>(position); //如果只有一个columnsIndex,直接用RowIndexEntry
}

Tombstone

http://stackoverflow.com/questions/27776337/what-types-of-tombstones-does-cassandra-support

Tombstone标记可以作用在A Column(d)、A Range of Columns(e)、A Whole Row。

单单DELETE语法就有多种:删除表、删除指定行、删除指定行的指定列。下面列举了几种Tombstone类型:

Tombstone类型 SQL 示例
column tombstone delete id from ts1 WHERE col1 = '3131'; {“key”: “3131”,”columns”: [[“id”,”54822130”,1417814320400000,”d”]]},
row tombstone delete from ts1 WHERE col1 = '31'; {“key”: “31”,”metadata”: {“deletionInfo”: {“markedForDeleteAt”:1417814302304000,”localDeletionTime”:1417814302}},”columns”: []}
list tombstone insert into flights (id, destinations) values ('BA1234', ['ORD', 'LHR']); [“1381316637599609:45787829:tags:_”,”1381316637599609:45787829:tags:!”,1438264650252000,”t”,1438264650],

3.0新存储引擎

http://thelastpickle.com/blog/2016/03/04/introductiont-to-the-apache-cassandra-3-storage-engine.html

Starting with the 3.x storage engine Partitions, Rows, and Clustering are natively supported.

A Partition is a collection of Rows that share the same Partition Key(s) that are ordered,

within the Partition, by their Clustering Key(s). Rows are then by globally identified by

their Primary Key: the combination of Partition Key and Clustering Key.

The important change is that the 3.x storage engine now knows about these ideas,

it may seem strange but previously it did not know about the Rows in a Partition.

The new storage engine was created specifically to handle these concepts in a way

that reduces storage requirements and improves performance.

http://www.datastax.com/2015/12/storage-engine-30

2.0 maps of (ordered) maps of binary data:
Map<byte[], SortedMap<byte[], Cell>>

The top-level keys of that map are the partition keys,

and each partition (identified by its key) is a sorted key/value map.

The inner values of that partition map is called a Cell mostly because

it contains both a binary value and the timestamp that is used for conflict resolution

3.0
Map<byte[], SortedMap<Clustering, Row>>

At the top-level, a table is still a map of partitions indexed by their partition key.

And the partition is still a sorted map, but it is one of rows indexed by their “clustering”.

The Clustering holds the values for the clustering columns of the CQL row it represents.

And the Row object represents, well, a given CQL row, associating to each column their value and timestamp.

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址