剑客
关注科技互联网

Cassandra源码分析-Network

Cassandra-2.2 源码分析:Netty客户端/服务端、请求处理、消息服务

CassandraDaemon

启动日志,代表了各个组件的启动顺序

INFO  [main] 2016-10-11 15:39:47,410 ColumnFamilyStore.java:382 - Initializing system.sstable_activity
INFO [main] 2016-10-11 15:39:48,950 CacheService.java:111 - Initializing key cache with capacity of 49 MBs.
INFO [main] 2016-10-11 15:39:48,967 CacheService.java:133 - Initializing row cache with capacity of 0 MBs
INFO [main] 2016-10-11 15:39:48,972 CacheService.java:162 - Initializing counter cache with capacity of 24 MBs
INFO [main] 2016-10-11 15:39:48,974 CacheService.java:173 - Scheduling counter cache save to every 7200 seconds (going to save all keys).
INFO [main] 2016-10-11 15:39:49,080 ColumnFamilyStore.java:382 - Initializing system.hints
INFO [main] 2016-10-11 15:39:49,089 ColumnFamilyStore.java:382 - Initializing system.......
INFO [main] 2016-10-11 15:39:51,302 ColumnFamilyStore.java:382 - Initializing demo.test
INFO [main] 2016-10-11 15:39:51,716 Index.java:93 - Initializing Lucene index
INFO [main] 2016-10-11 15:39:52,405 Index.java:101 - Initialized index demo.test.idx
INFO [main] 2016-10-11 15:39:52,413 ColumnFamilyStore.java:382 - Initializing demo.tweets
INFO [main] 2016-10-11 15:39:52,419 AutoSavingCache.java:163 - Completed loading (1 ms; 21 keys) KeyCache cache
INFO [main] 2016-10-11 15:39:52,497 CommitLog.java:168 - Replaying bin/../data/commitlog/CommitLog-5-1474959171115.log, ....
INFO [main] 2016-10-11 15:39:52,739 CommitLog.java:170 - Log replay complete, 135 replayed mutations

INFO [main] 2016-10-11 15:39:52,969 StorageService.java:600 - Cassandra version: 2.2.6
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:601 - Thrift API version: 20.1.0
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:602 - CQL supported versions: 3.3.1 (default: 3.3.1)
INFO [main] 2016-10-11 15:39:53,010 IndexSummaryManager.java:85 - Initializing index summary manager with a memory pool size of 49 MB and a resize interval of 60 minutes

INFO [main] 2016-10-11 15:39:53,013 StorageService.java:621 - Loading persisted ring state
INFO [main] 2016-10-11 15:39:53,056 StorageService.java:794 - Starting up server gossip
INFO [main] 2016-10-11 15:39:53,247 MessagingService.java:540 - Starting Messaging Service on localhost/127.0.0.1:7000 (lo0)
INFO [main] 2016-10-11 15:39:53,318 StorageService.java:968 - Using saved tokens [-1036061867878377743, -1049032071638556980, ]
INFO [main] 2016-10-11 15:39:53,425 StorageService.java:1937 - Node localhost/127.0.0.1 state jump to NORMAL

INFO [main] 2016-10-11 15:39:53,785 Server.java:151 - Netty using Java NIO event loop
INFO [main] 2016-10-11 15:39:53,970 Server.java:185 - Starting listening for CQL clients on localhost/127.0.0.1:9042...
INFO [main] 2016-10-11 15:39:54,159 CassandraDaemon.java:439 - Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) or nodetool (enablethrift) to start it

停止Cassandra,会停止CassandraDaemon、Server(nativeServer)、Gossiper。因为默认没有启动ThriftServer,所以就不需要停止它了。

INFO  [RMI TCP Connection(5)-127.0.0.1] 2016-10-11 15:49:05,275 CassandraDaemon.java:451 - Cassandra shutting down...
INFO [RMI TCP Connection(5)-127.0.0.1] 2016-10-11 15:49:05,286 Server.java:218 - Stop listening for CQL clients
INFO [StorageServiceShutdownHook] 2016-10-11 15:49:05,292 Gossiper.java:1448 - Announcing shutdown

CassandraDaemon启动类,有三个主要的服务类:

  1. StorageService:存储相关的服务
  2. ThriftServer:Thrift协议
  3. Server:native网络传输通信服务器
private static final CassandraDaemon instance = new CassandraDaemon();
public static void main(String[] args) {
instance.activate();
}
public void activate(){
setup();
start();
}
protected void setup(){
StorageService.instance.initServer();

int rpcPort = DatabaseDescriptor.getRpcPort();
int nativePort = DatabaseDescriptor.getNativeTransportPort();
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
}
public void start() {
nativeServer.start();
thriftServer.start();
}

配置文件中端口和对应的实现类:

storage_port: 7000              --> StorageService
native_transport_port: 9042 --> nativeServer
rpc_port: 9160 --> ThriftServer

start_native_transport: true --> 默认开启native协议
start_rpc: false --> 默认关闭thrift协议

CassandraDaemon的内部类Server有两个实现类,用于Thrift协议的o.a.c.thrift.ThriftServer,以及用于native二进制协议的o.a.c.transport.Server。

ThriftServer

cassandra.thrift文件在安装包的interface下,主要分为

  1. data structures(Column、SuperColumn等)
  2. service的struct数据结构:ConsistencyLevel、ColumnParent、ColumnPath、SliceRange、KeyRange、KeySlice、Deletion、Mutation、TokenRange、ColumnDef、CfDef、KsDef、ColumnSlice等
  3. service的api服务方法:get、get_slice、multiget_slice、get_range_slices、insert、add、remove、batch_mutate、get_multi_slice等
public class ThriftServer implements CassandraDaemon.Server {
public void start() {
CassandraServer iface = getCassandraServer();
server = new ThriftServerThread(address, port, backlog, getProcessor(iface), getTransportFactory());
server.start();
}
private static class ThriftServerThread extends Thread {
private final TServer serverEngine;
public ThriftServerThread(...) {
serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
}
public void run() {
serverEngine.serve();
}
}
}
public class CustomTThreadPoolServer extends TServer {
public void serve() {
serverTransport_.listen();
stopped = false;
while (!stopped) {
TTransport client = serverTransport_.accept();
processorFactory_.getProcessor(client_).process(input,output)
}
executorService.shutdown();
}
}

以get查询为例:cassandra.thrift的服务定义了get方法需要主键key、列路径ColumnPath、一致性级别

struct ColumnPath {
3: required string column_family,
4: optional binary super_column,
5: optional binary column,
}
service cassandra {
ColumnOrSuperColumn get(1:required binary key,
2:required ColumnPath column_path,
3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
}

服务端的处理方法在interface/thrift/gen-java/o.a.c.thrift.Cassandra类的TProcessor中

public static class get<I extends Iface> extends org.apache.thrift.ProcessFunction<I, get_args> {
public get_result getResult(I iface, get_args args) throws org.apache.thrift.TException {
get_result result = new get_result();
result.success = iface.get(args.key, args.column_path, args.consistency_level);
return result;
}
}

最终会调用o.a.c.thrift.CassandraServer的get方法:

public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level) {
ThriftClientState cState = state();
String keyspace = cState.getKeyspace();
cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);

CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);

SortedSet<CellName> names = new TreeSet<CellName>(metadata.comparator);
names.add(metadata.comparator.cellFromByteBuffer(column_path.column));
IDiskAtomFilter filter = new NamesQueryFilter(names);

ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter);
Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel, cState);
ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));

List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false, now);
return tcolumns.get(0);
}

根据客户端构造好的ReadCommand查询发生在readColumnFamily,并通过StorageProxy代理类完成读操作

protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level, ClientState cState) {
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();
schedule(DatabaseDescriptor.getReadRpcTimeout());
List<Row> rows = StorageProxy.read(commands, consistency_level, cState);
for (Row row: rows) {
columnFamilyKeyMap.put(row.key, row.cf);
}
return columnFamilyKeyMap;
}

Java Driver(Netty)

DataStax的 Java客户端 使用Netty实现,Cassandra的native服务端协议也采用Netty实现。

所以先了解客户端怎么发送数据,才能知道服务端怎么接收数据。使用Driver, 读取Cassandra版本的简单示例 如下:

Cluster cluster = Cluster.builder()
.addContactPoints(CONTACT_POINTS).withPort(PORT)
.build();
Session session = cluster.connect();
ResultSet rs = session.execute("select release_version from system.local");
Row row = rs.one();
String releaseVersion = row.getString("release_version");

Session是客户端建立的和服务端的会话连接对象,当connect连接建立成功后,实际上客户端和服务端的网络通道已经都打通了。Connection是客户端和服务端节点实际的连接处理对象。

Cassandra源码分析-Network

DataStax的Driver是Netty的客户端,Cassadra的nativeServer是Netty的服务端。所以Driver采用Bootstrap连接服务端,服务端采用ServerBootstrap接受客户端的连接。

class Connection {
ListenableFuture<Void> initAsync() {
Bootstrap bootstrap = factory.newBootstrap();
ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions();
bootstrap.handler(
new Initializer(this, protocolVersion, protocolOptions.getCompression().compressor(), protocolOptions.getSSLOptions(),
factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(),
factory.configuration.getNettyOptions(),
factory.configuration.getCodecRegistry()));

ChannelFuture future = bootstrap.connect(address);
}
}

客户端实际的Handler主要是Initializer,而其中处理请求的是Connection.Dispatcher

private static class Initializer extends ChannelInitializer<SocketChannel> {
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
private static final Message.ProtocolEncoder messageEncoderV4 = new Message.ProtocolEncoder(ProtocolVersion.V4);
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private final Connection connection;
private final FrameCompressor compressor;
private final NettyOptions nettyOptions;
private final ChannelHandler idleStateHandler;
private final CodecRegistry codecRegistry;

protected void initChannel(SocketChannel channel) throws Exception {
// set the codec registry so that it can be accessed by ProtocolDecoder
channel.attr(Message.CODEC_REGISTRY_ATTRIBUTE_KEY).set(codecRegistry);
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new Frame.Decoder());
pipeline.addLast("frameEncoder", frameEncoder);
if (compressor != null) {
pipeline.addLast("frameDecompressor", new Frame.Decompressor(compressor));
pipeline.addLast("frameCompressor", new Frame.Compressor(compressor));
}
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoderFor(protocolVersion));
pipeline.addLast("idleStateHandler", idleStateHandler);
pipeline.addLast("dispatcher", connection.dispatcher);
nettyOptions.afterChannelInitialized(channel);
}
}

Dispatcher看起来只是负责读取消息的响应结果

class Dispatcher extends SimpleChannelInboundHandler<Message.Response> {
protected void channelRead0(ChannelHandlerContext ctx, Message.Response response) throws Exception {
int streamId = response.getStreamId();
ResponseHandler handler = pending.remove(streamId);
handler.cancelTimeout();
handler.callback.onSet(Connection.this, response, System.nanoTime() - handler.startTime, handler.retryCount);
if (isClosed()) tryTerminate(false);
}
}

那么客户端在哪里发送数据呢?我们从示例的session.execute看看能不能找到发送消息的线索。

public ResultSetFuture executeAsync(final Statement statement) {
DefaultResultSetFuture future = new DefaultResultSetFuture(this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();
return future;
}

makeRequestMessage会创建请求,那么sendRequest就会真正地发送请求了。

class RequestHandler {
void sendRequest() {
startNewExecution();
}
private void startNewExecution() {
//future就是callback,因为future中会makeRequestMessage,所以这里可以获取callback的Request
Message.Request request = callback.request();
SpeculativeExecution execution = new SpeculativeExecution(request, position);
runningExecutions.add(execution);
execution.sendRequest(); //发送请求,request封装在execution中
}

}

SpeculativeExecution是推测执行,其中QueryPlan是查询计划(根据客户端设置的负载均衡策略,路由客户端请求到不同的host节点,这个host就是传说中的Coordinator)。

class SpeculativeExecution implements Connection.ResponseCallback {
private final Message.Request request;

void sendRequest() {
Host host;
while (!isDone.get() && (host = queryPlan.next()) != null && !queryStateRef.get().isCancelled()) {
if (query(host)) return;
}
reportNoMoreHosts(this);
}
private boolean query(final Host host) {
HostConnectionPool currentPool = manager.pools.get(host);
if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true))
scheduleExecution(speculativeExecutionPlan.nextExecution(host));
Connection connection = currentPool.borrowConnection(manager.configuration().getPoolingOptions().getPoolTimeoutMillis(), TimeUnit.MILLISECONDS);
write(connection, this);
return true;
}

private void write(Connection connection, Connection.ResponseCallback responseCallback) throws ConnectionException, BusyConnectionException {
connectionHandler = connection.write(responseCallback, statement.getReadTimeoutMillis(), false);
}
}

query方法看起来把request对象丢了,不过write(connection, this)传递了this对象,仍然有机会取出request对象。

write方法继续传递responseCallback对象,可以看到callback.request()起死回生了,我们的请求对象request并没有丢失。

channel.writeAndFlush(request)是Netty写数据的方法,即客户端把请求对象发送给了服务端。

ResponseHandler write(ResponseCallback callback, long statementReadTimeoutMillis, boolean startTimeout) throws ConnectionException, BusyConnectionException {
ResponseHandler handler = new ResponseHandler(this, statementReadTimeoutMillis, callback);
dispatcher.add(handler);
Message.Request request = callback.request().setStreamId(handler.streamId);
if (DISABLE_COALESCING) { //直接写,不缓存
channel.writeAndFlush(request).addListener(writeHandler(request, handler));
} else { //缓存
flush(new FlushItem(channel, request, writeHandler(request, handler)));
}
return handler;
}

nativeServer(Netty)

native服务器使用Netty,ServerBootstrap绑定的Initializer添加了多种Handler组成ChannelPipeline:

  1. Frame解码、编码
  2. 消息解码、编码
  3. 消息分发(Dispatcher)
public class Server implements CassandraDaemon.Server {
private EventLoopGroup workerGroup;
private EventExecutor eventExecutorGroup;

private void run() {
eventExecutorGroup = new RequestThreadPoolExecutor();
boolean hasEpoll = enableEpoll ? Epoll.isAvailable() : false;
if (hasEpoll) {
workerGroup = new EpollEventLoopGroup();
} else {
workerGroup = new NioEventLoopGroup();
}
ServerBootstrap bootstrap = new ServerBootstrap()
.group(workerGroup)
.channel(hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive())
.childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
bootstrap.childHandler(new Initializer(this));
bootstrap.bind(socket);
}

private static class Initializer extends ChannelInitializer {
private final Server server;

protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst("connectionLimitHandler", new ConnectionLimitHandler()); //连接限制
pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); //Frame解码
pipeline.addLast("frameEncoder", new Frame.Encoder()); //Frame编码
pipeline.addLast("frameDecompressor", new Frame.Decompressor()); //Frame解压缩
pipeline.addLast("frameCompressor", new Frame.Compressor()); //Frame压缩
pipeline.addLast("messageDecoder", new Message.ProtocolDecoder()); //消息内容解码
pipeline.addLast("messageEncoder", new Message.ProtocolEncoder()); //消息内容编码
pipeline.addLast(server.eventExecutorGroup, "executor", new Message.Dispatcher()); //消息分发
}
}
}

编解码和请求、响应是对应的,比如服务端收到请求,将客户端发送的请求进行解码(ProtocolDecoder),服务端处理完毕后,将响应内容编码发送到客户端(ProtocolEncoder)。

CQL协议和Thrift协议一样,都需要事先定义好数据结构、服务方法等,CQL协议的说明文档在doc文件夹下,Frame的中文翻译是框架,所以它定义了消息内容的格式,其中Header消息头一共9个字节(40+32=72bits/8=9byte),消息内容是不定长的。Message是建立在Frame之上的消息类型(所以你可以看到Initializer构建ChannelPipeline是先Frame,然后是Message,最后是Message的Dispatcher,这跟请求的处理也是类型的:服务端先接收请求,然后解析出对应的请求类型,最后才处理请求)。

消息类型有多种:ERROR、STARTUP、QUERY、RESULT、PREPARE、EXECUTE、EVENT、BATCH,每种消息类型都指定了是Request还是Response。比如ERROR、RESULT、EVENT是Response,其他都是Request。

Dispatcher

服务端的Dispatcher处理器会接收请求、执行请求、返回响应结果。这里的flush和Netty客户端中发送请求时采用缓存形式的flush类似,

不过最终的目的都是发送数据给对端(客户端发送请求给服务端,服务端发送响应结果给客户端)。

public static class Dispatcher extends SimpleChannelInboundHandler<Request> {
public void channelRead0(ChannelHandlerContext ctx, Request request) {
ServerConnection connection = (ServerConnection)request.connection();
logger.trace("Received: {}, v={}", request, connection.getVersion());

Response response = request.execute(qstate); //服务端执行请求
response.setStreamId(request.getStreamId());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);

logger.trace("Responding: {}, v={}", response, connection.getVersion());
flush(new FlushItem(ctx, response, request.getSourceFrame()));
}
}

以CQL查询为例,trace日志如下:

TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,345 Message.java:506 - Received: QUERY select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app';, v=4

TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,346 QueryProcessor.java:221 - Process org.apache.cassandra.cql3.statements.SelectStatement@37504b54 @CL.ONE
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,346 SliceQueryPager.java:92 - Querying next page of slice query; new filter: SliceQueryFilter [reversed=false, slices=[[, ]], count=100, toGroup = 1]
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,347 ReadCallback.java:76 - Blockfor is 1; setting up requests to localhost/127.0.0.1
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,347 AbstractReadExecutor.java:118 - reading data locally
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 0 of 100: 1111111111-1::false:0@1476176498640102
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 1 of 100: 1111111111-1:event:false:10@1476176498640102
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 1 of 100: 1111111111-1:timestamp:false:8@1476176498640102
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,348 StorageProxy.java:1444 - Read: 1 ms.
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:95 - Fetched 1 live rows
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:112 - Got result (1) smaller than page size (100), considering pager exhausted
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:133 - Remaining rows to page: 2147483646

TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,349 Message.java:525 - Responding: ROWS [attribute(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][partner_code(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][app_name(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][type(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][sequence_id(forseti, velocity_app), org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type)][event(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][timestamp(forseti, velocity_app), org.apache.cassandra.db.marshal.LongType]
| zqhxuyuan | tongdun | tongdun_app | login | 1111111111-1 | {jsondata} | 1111111111
---, v=4

如果开启tracing on,会显示查询语句在服务端的运行轨迹。

Tracing session: 6eceb810-8f91-11e6-a2b4-dbe2eb0e3cb9

activity | timestamp | source | source_elapsed
--------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------
Execute CQL3 query | 2016-10-11 17:02:27.345000 | 127.0.0.1 | 0
Parsing select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app'; [SharedPool-Worker-1] | 2016-10-11 17:02:27.345000 | 127.0.0.1 | 333
Preparing statement [SharedPool-Worker-1] | 2016-10-11 17:02:27.346000 | 127.0.0.1 | 730
Executing single-partition query on velocity_app [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2236
Acquiring sstable references [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2357
Merging memtable tombstones [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2446
Skipped 0/0 non-slice-intersecting sstables, included 0 due to tombstones [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2591
Merging data from memtables and 0 sstables [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2661
Read 1 live and 0 tombstone cells [SharedPool-Worker-2] | 2016-10-11 17:02:27.348000 | 127.0.0.1 | 3240
Request complete | 2016-10-11 17:02:27.349147 | 127.0.0.1 | 4147

可以看到 日志文件 第一行/最后一行打印的时间撮和 tracing on 的第一行/最后一行基本一致。

//日志文件
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,345 Message.java:506 - Received: QUERY
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,349 Message.java:525 - Responding: ROWS

//CQL tracing on
Execute CQL3 query | 2016-10-11 17:02:27.345000
Request complete | 2016-10-11 17:02:27.349147

QueryMessage

以o.a.c.transport.messages.QueryMessage请求为例,

public Message.Response execute(QueryState state) {
Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
return response;
}

CQLQueryHandler的处理器是QueryProcessor,

public ResultMessage process(String queryString, QueryState queryState, QueryOptions options) {
ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
options.prepare(p.boundNames);
CQLStatement prepared = p.statement;
return processStatement(prepared, queryState, options);
}
public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options) {
logger.trace("Process {} @CL.{}", statement, options.getConsistency());
ClientState clientState = queryState.getClientState();
ResultMessage result = statement.execute(queryState, options);
return result == null ? new ResultMessage.Void() : result;
}

Response response = request.execute(qstate):我们举例了request是QueryMessage(即Message.Request类型),返回结果是ResultMessage,正好是Message.Response类型。

这和我们说的消息类型中,QUERY是Request,RESULT是Response就对应上来了。

现在从Message进入到Statement,以SelectStatement为例,我们终于看到了和thrift类似的StorageProxy代理调用

通常消息类型也会对应不同的Statement,比如QueryMessage对应了SelectStatement,Execute或Batch消息对应不同的Statement。

请求对象的转换:Request – Statement – Command。比如查询请求 – SelectStatement – ReadCommands。

public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException {
ConsistencyLevel cl = options.getConsistency();
int limit = getLimit(options);
Pageable command = getPageableCommand(options, limit, now);
int pageSize = getPageSize(options);
if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
return execute(command, options, limit, now, state); //不分页查询
QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState());
return execute(pager, options, limit, now, pageSize); //分页查询
}
private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) {
List<Row> rows = command instanceof Pageable.ReadCommands
? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState())
: StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
return processResults(rows, options, limit, now);
}

不管是thrift协议的ThriftServer,还是二进制协议的Server,最终都会调用StorageProxy代理类。

Cassandra源码分析-Network

StorageProxy

StorageProxy代理类的read方法根据一致性级别是不是Serial有两种:普通的读取和事务性的读取(Transaction)。

Cassandra的事务支持使用Paxos实现,对应的读方法是:readWithPaxos

// Performs the actual reading of a row out of the StorageService, fetching a specific set of column names from a given column family.
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) {
return consistencyLevel.isSerialConsistency()
? readWithPaxos(commands, consistencyLevel, state)
: readRegular(commands, consistencyLevel);
}
private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) {
List<Row> rows = fetchRows(commands, consistencyLevel);
return rows;
}
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel){
// send out read requests
for (int i = 0; i < commands.size(); i++) {
ReadCommand command = commands.get(i);
AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
exec.executeAsync();
}
}

读取的线程池有多种实现,比如不带推测的NeverSpeculatingReadExecutor。实际的读取线程还是被包装在ReadCommand中。

private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor {
public void executeAsync() {
makeDataRequests(targetReplicas.subList(0, 1));
if (targetReplicas.size() > 1)
makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
}
}
private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints) {
for (InetAddress endpoint : endpoints) {
if (isLocalRequest(endpoint)) { //请求的目的地包含本地节点
hasLocalEndpoint = true;
continue;
}
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
if (hasLocalEndpoint) { //立即在本地执行,由于还有远程数据需要读取,所以需要callback/handler
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}

在前面启动CassandraDaemon时,我们说每个Cassandra都会启动Thrift和native两种服务器。对应的StorageProxy作为代理类会接收客户端发送的各种请求(比如读和写)。

但是作为分布式系统,客户端发送请求,具体要交给哪些节点处理呢?Cassandra中有一个协调者的角色表示接收客户端的请求所在的节点,但这个节点可能并不是真正存储数据的节点,

它会将客户端的请求转发到其他真正应该需要存储数据的节点。读取和存储一样,如果数据没有存储在协调者节点上,也就无法从协调者读取数据,那么协调者也应该负责发送读取请求到

真正存储数据的节点,然后等待真实节点返回数据给协调者,再由协调者返回数据给客户端。

这里接收请求的节点即协调者,就会负责makeRequests创建请求。如果说客户端的请求正好也会存储到当前协调者上,那么协调者就可以直接存储数据了。

所以如果满足isLocalRequest,就会在本地节点通过maybeExecuteImmediately立即执行命令。对于其他非本地的远程节点,则通过sendRRWithFailure把带有命令的请求发送出去(发送到哪个目标节点,由第二个参数endpoint决定)。

LocalReadRunnable封装了ReadCommand线程类和回调函数,实际的读取在command.getRow,最后返回Row一行记录。ReadCommand有两种实现:SliceFromReadCommand和SliceByNamesReadCommand。

static class LocalReadRunnable extends DroppableRunnable {
private final ReadCommand command;
private final ReadCallback<ReadResponse, Row> handler;

protected void runMayThrow() {
Keyspace keyspace = Keyspace.open(command.ksName);
Row r = command.getRow(keyspace);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
handler.response(result);
}
}

我们来看下客户端调用StorageProxy的命令(比如ReadCommand)是如何在服务端传输的

  1. AbstractReadExecutor(比如NeverSpeculatingReadExecutor),然后调用executeAsync执行线程池
  2. LocalReadRunnable,调用maybeExecuteImmediately执行线程
  3. 在LocalReadRunnable里,runMayThrow会开始真正执行ReadCommand的getRow指令

Cassandra源码分析-Network

StorageService

CassandraDaemon在启动thrift服务器和native服务器之前,先初始化了StorageService。刚启动的Cassandra会尝试加入集群,其中和网络相关的是MessagingService消息服务。

StorageService类似前面的ThriftServer和native netty Server,都是一种服务端的实现。只不过StorageService负责存储,而前两者负责消息传输、RPC调用。

问题:StorageProxy可以看做是StorageService的前置代理类,客户端请求要先经过StorageProxy才能到达StorageService。还是说StorageProxy和StorageService是平等的关系?

实际上两者应该是平等的,我们并没有看到StorageProxy到StorageService的调用。再者,StorageService本身也是可以接收客户端请求的。

public synchronized void initServer(int delay) {
prepareToJoin();
}
private void prepareToJoin() {
Gossiper.instance.register(this);
Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates);
MessagingService.instance().listen();
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
}

StorageService除了消息的存储服务类MessagingService外,还有其他和消息存储相关的第三方类,这些类共同组成了Cassandra分布式的存储特性,包括:

  1. Gossiper协议,用来保证集群、节点的一致性
  2. HintedHandOffManager,在节点出现异常时,管理暂时失败的请求
  3. BatchlogManager,提交日志管理类

MessagingService

消息服务采用原始的ServerSocket,启动服务端线程后,在SocketThread中开始接受客户端请求,客户端请求的类型包括stream和普通的消息。

Streaming消息也包括多种类型,主要发生于节点之间数据的流式交换,比如sstableloader,nodetool repair都会产生streaming线程。

http://www.datastax.com/dev/blog/streaming-in-cassandra-2-0

private void listen(InetAddress localEp) throws ConfigurationException {
for (ServerSocket ss : getServerSockets(localEp)) { //监听storage port端口
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
th.start();
socketThreads.add(th);
}
}
public static class SocketThread extends Thread {
private final ServerSocket server;

public void run() {
Socket socket = server.accept();
DataInputStream in = new DataInputStream(socket.getInputStream());
int header = in.readInt();
boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
int version = MessagingService.getBits(header, 15, 8);
Thread thread = isStream
? new IncomingStreamingConnection(version, socket, connections)
: new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket, connections);
thread.start();
}
}

IncomingTcpConnection是一个后台线程类,会不停地读取并处理消息,然后交给MessagingService的实例处理。

一个StorageService对应一个ServerSocket,每个ServerSocket只有一个SocketThread,SocketThread的run方法执行后就结束了(调用一次),

那么保证线程不断运行(不中断)就交给了IncomingTcpConnection类去完成了。

public void run(){
receiveMessages();
}
private void receiveMessages() {
DataInput in = new DataInputStream(socket.getInputStream());
while (true){
receiveMessage(in, version);
}
}
private InetAddress receiveMessage(DataInput input, int version) throws IOException {
MessageIn message = MessageIn.read(input, version, id);
MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
return message.from;
}

MessageIn根据输入流构造,其中最关键的是verb,用来决定是哪种类型的消息。

public class MessageIn<T> {
public final InetAddress from;
public final T payload;
public final Map<String, byte[]> parameters;
public final MessagingService.Verb verb;
public final int version;
}

MessagingService.receive(MessageIn)接收到消息后会创建一个MessageDeliveryTask,每个Task会在不同Stage的ThreadPool中运行

public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp) {
Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
LocalAwareExecutorService stage = StageManager.getStage(message.getMessageType());
stage.execute(runnable, ExecutorLocals.create(state));
}

MessageDeliveryTask也是一个线程,不过它是被线程池调度的,执行完了就完了,不像IncomingTcpConnection那样永远不会结束。

public class MessageDeliveryTask implements Runnable {
private final MessageIn message;

public void run() {
MessagingService.Verb verb = message.verb;
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
verbHandler.doVerb(message, id);
}
}

先来看下线程的调度,是通过LocalAwareExecutorService,类似线程池。注意execute方法并没有真正执行任务,而是把Runnable的任务包装成FutureTask,并等待后续的某个时间才开始调度。

public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
public void execute(Runnable command, ExecutorLocals locals) {

addTask(newTaskFor(command, null, locals));
}
protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals) {
if (locals != null) {
return new LocalSessionFutureTask<T>(runnable, result, locals);
}
return new FutureTask<>(runnable, result);
}
}

前面我们说过协调者会将客户端请求转发到非本地节点,实际上使用的是OutboundTcpConnection,那么对于服务端接收客户端消息,则用的是StorageService的IncomingTcpConnection

Cassandra源码分析-Network

比较StorageProxy和StorageService的MessageService的一些共同点:

服务类 线程池 指令 线程 真正执行方法
StorageProxy AbstractReadExecutor ReadCommand LocalReadRunnable ReadCommand.getRow
MessageService LocalAwareExecutorService MessageIn MessageDeliveryTask/ExecutorLocals IVerbHandler.doVerb(messageIn)

IVerbHandler接口

IVerbHandler和消息类型一样有多种实现类。

思考下前面使用StorageProxy时,ReadCommand直接执行getRow方法,而用IVerbHandler,则对应使用ReadVerbHandler.doVerb(messageIn),其中messageIn就是ReadCommand。

所以实际上ReadVerbHandler是ReadCommnad的一层封装而已,在ReadVerbHandler.doVerb中最终还是会调用到ReadCommand.getRow方法。

那么为什么要有ReadCommand和ReadVerbHandler两种实现呢,实际上ReadCommand仅仅是Read操作的处理方式,而ReadVerbHandler不仅包括要调用ReadCommand,还要负发送请求。

Cassandra源码分析-Network

ReadVerbHandler

Mutation是写操作,Read是读操作,读写操作都会返回响应给客户端。只不过读操作要将读取结果集Row对象封装到MessageOut中。读写的区别是message的payload,读是ReadCommand,而写是Mutation。这里的读是根据主键唯一查询,如果是根据主键进行能范围查询,则对应RangeSliceVerbHandler。

MessageIn类似于StorageProxy的Message.Request,而MessageOut就等价于Message.Response。

public class ReadVerbHandler implements IVerbHandler<ReadCommand> {
public void doVerb(MessageIn<ReadCommand> message, int id) {
ReadCommand command = message.payload;
Keyspace keyspace = Keyspace.open(command.ksName);
Row row = command.getRow(keyspace);

MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE,
getResponse(command, row),
ReadResponse.serializer);
MessagingService.instance().sendReply(reply, id, message.from);
}

public static ReadResponse getResponse(ReadCommand command, Row row) {
if (command.isDigestQuery()) {
return new ReadResponse(ColumnFamily.digest(row.cf));
} else {
return new ReadResponse(row);
}
}
}
public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand> {
public void doVerb(MessageIn<AbstractRangeCommand> message, int id) {
RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
}
}

ReadVerbHandler会调用ReadCommand的实际业务处理getRow方法,而且还要将读取结果发送回源节点:MessageIn中不仅带有具体的操作指令,还有这些指令的来源。

比如Server1发送了ReadCommand给Server2(表示Server1要读取Server2),那么message中不仅有ReadCommand,还表示ReadCommand是从Server1过来的。

所以在Server2节点上,ReadVerbHandler执行完ReadCommand后,要将读取结果返回给Server1。

ReadCommand有两个实现类:SliceFromReadCommand和SliceByNamesReadCommand,同样读操作会通过Keyspace->ColumnFamilyStore->ColumnFamily。

在StorageProxy.read中,最终也会到达ReadCommand。那么为什么有两种读取实现呢?其实通过IVerbHandler是以接收消息的形式,一旦节点接收到读命令后,接着读取keyspace。

而StorageProxy可以看做是协调节点,如果请求发送的目标endpoints中包含当前本地节点,也需要读取数据,这时不是以接收消息的形式,而是直接RPC的形式。

//SliceFromReadCommand
public Row getRow(Keyspace keyspace) {
CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}

//Keyspace
public Row getRow(QueryFilter filter) {
ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName());
ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
return new Row(filter.key, columnFamily);
}

MutationVerbHandler

Cassandra的Insert、Update、Delete都属于Mutation,所以MutationVerbHandler处理的是Mutation操作。

和读操作不同的是,读取数据时可能只会读取一个节点,其他节点读取的是Digest。而写操作要将写发送到每个副本上去。

当然MutationVerbHandler本身不会去实现副本复制。它只负责要么将Mutation存储到本地,要么将Mutation发送出去。

public class MutationVerbHandler implements IVerbHandler<Mutation> {
public void doVerb(MessageIn<Mutation> message, int id) throws IOException {
byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
InetAddress replyTo;
if (from == null) {
replyTo = message.from;
byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
if (forwardBytes != null)
forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
} else {
replyTo = InetAddress.getByAddress(from);
}
message.payload.apply(); //这里是重点
WriteResponse response = new WriteResponse();
MessagingService.instance().sendOneWay(response.createMessage(), id, replyTo);
}

private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException {
try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes))) {
int size = in.readInt();
// tell the recipients who to send their ack to
MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
// Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++) {
InetAddress address = CompactEndpointSerializationHelper.deserialize(in);
int id = in.readInt();
MessagingService.instance().sendOneWay(message, id, address);
}
}
}
}

Mutation的apply会将Mutation运用到Keyspace->ColumnFamilyStore,最终我们看到了分布式存储系统中熟悉的Memtable这个对象。

//Mutation
public void apply() {
Keyspace ks = Keyspace.open(keyspaceName);
ks.apply(this, ks.getMetadata().durableWrites);
}

//Keyspace
public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) {
try (OpOrder.Group opGroup = writeOrder.start()) {
ReplayPosition replayPosition = null;
if (writeCommitLog) replayPosition = CommitLog.instance.add(mutation);
DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
for (ColumnFamily cf : mutation.getColumnFamilies()) {
ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
SecondaryIndexManager.Updater updater = updateIndexes
? cfs.indexManager.updaterFor(key, cf, opGroup)
: SecondaryIndexManager.nullUpdater;
cfs.apply(key, cf, updater, opGroup, replayPosition);
}
}
}

//ColumnFamilyStore
public void apply(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition) {
Memtable mt = data.getMemtableFor(opGroup, replayPosition);
mt.put(key, cf, indexer, opGroup);
maybeUpdateRowCache(key);
}

Response

MutationVerbHandler将Mutation运用到本地结束后,要返回结果给客户端。就像MessagingService接收请求后使用IncomingTcpConnection->MessageDeliveryTask线程操作读,返回结果会使用OutboundTcpConnection线程完成写。

    public void sendOneWay(MessageOut message, int id, InetAddress to) {
OutboundTcpConnection connection = getConnection(to, message);
connection.enqueue(message, id); //队列
}

public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg) {
return getConnectionPool(to).getConnection(msg);
}
public OutboundTcpConnectionPool getConnectionPool(InetAddress to) {
OutboundTcpConnectionPool cp = connectionManagers.get(to);
if (cp == null) {
cp = new OutboundTcpConnectionPool(to);
OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
if (existingPool != null) cp = existingPool;
else cp.start();
}
cp.waitForStarted();
return cp;
}
}

每个节点都会创建三个OutboundTcpConnection,启动OutboundTcpConnectionPool时会同时启动三个OutboundTcpConnection

OutboundTcpConnectionPool(InetAddress remoteEp) {
smallMessages = new OutboundTcpConnection(this);
largeMessages = new OutboundTcpConnection(this);
gossipMessages = new OutboundTcpConnection(this);
}
public void start(){
smallMessages.start();
largeMessages.start();
gossipMessages.start();
}

nodetool的netstats命令最后三行(Pool Name 连接池名称)对应上面的三种OutboundTcpConnection。

➜  apache-cassandra-2.2.6 bin/nodetool -h 192.168.6.52 netstats
Mode: NORMAL
Not sending any streams.
Read Repair Statistics:
Attempted: 376745
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name Active Pending Completed
Large messages n/a 0 9
Small messages n/a 0 2495610
Gossip messages n/a 0 2390273

OutboundTcpConnection的enqueue会将消息入队列,后台线程会从队列中取出消息执行write方法,将消息发送出去

public void enqueue(MessageOut<?> message, int id) {
if (backlog.size() > 1024)
expireMessages();
backlog.put(new QueuedMessage(message, id));
}
public void run() {
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
while (true) {
cs.coalesce(backlog, drainedMessages, drainedMessageSize);
for (QueuedMessage qm : drainedMessages) {
writeConnected(qm, count == 1 && backlog.isEmpty());
}
drainedMessages.clear();
}
}
private void writeConnected(QueuedMessage qm, boolean flush) {
writeInternal(qm.message, qm.id, timestampMillis);
completed++;
if (flush) out.flush();
}
private void writeInternal(MessageOut message, int id, long timestamp) throws IOException {
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);
out.writeInt((int) timestamp);
message.serialize(out, targetVersion);
}

OutboundTcpConnection线程实际上和StorageService用的都是Storage端口(7000),表示每个Cassandra节点既有输入(接收请求),也有输出(发送响应)。

Cassandra源码分析-Network

DataModel

OnDiskAtom:在盘原子,有两个实现类:RangeTombstone和Cell,Cell也有多种接口:AbstractCell、CounterCell、ExpiringCell、DeletedCell。

这里已经把删除相关的几种实现都覆盖了:TTL为ExpiringCell,删除命令为DeletedCell,删除多个为RangeTombstone。

普通操作的抽象类是AbstractCell,有两种大类实现:BufferCell和AbstractNativeCell,分别代表内存和Offheap中的Cell。

public class Row {
public final DecoratedKey key;
public final ColumnFamily cf;
}
public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry {
protected final CFMetaData metadata;
}

public class ArrayBackedSortedColumns extends ColumnFamily {
private DeletionInfo deletionInfo;
private Cell[] cells;
private int size;
private int sortedSize;
}

public interface Cell extends OnDiskAtom {
public CellName name();
public ByteBuffer value();
}
public class BufferCell extends AbstractCell {
protected final CellName name;
protected final ByteBuffer value;
protected final long timestamp;
}

public class Keyspace {
private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<>();
private volatile KSMetaData metadata;
}

Keyspace和ColumnFamily的定义都有元数据,分别用KSMetaData和CFMetadata表示ks和表级别的配置信息。

数据库:Keyspace

表:ColumnFamily

主键:DecoratedKey

行:Row

列:Cell

Row由Key和ColumnFamily组成,即主键和列族(很多个列)。ColumnFamily是Column的Family,Column也叫做Cell。

ColumnFamily可以用一个双层Map表示:
Map<RowKey, SortedMap<ColumnKey, ColumnValue>>

因为是Map结构,所以查询Map中的指定key非常快,列是有序存储的,所以扫描多个列或者定位某个列也很高效。

Cassandra源码分析-Network

Cell有多种实现,除了几种删除相关的Cell外,普通Cell又分为BufferCell和NativeCell。

其中BufferCell在内存中,而NativeCell在OffHeap中。

Cassandra源码分析-Network

下面是读操作引起Java堆内存溢出的堆栈信息,有可能是读操作将数据不断放入内存,导致内存不足引起内存溢出。

最终调用的是OnDiskAtom的deserializeFromSSTable,即读取SSTable时反序列化的数据会写到内存中。

ERROR 19:20:50,316 Exception in thread Thread[ReadStage:63,5,main]

java.lang.OutOfMemoryError: Java heap space
at java.nio.ByteBuffer.wrap(ByteBuffer.java:350)
at java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
at org.apache.cassandra.io.util.RandomAccessReader.readBytes(RandomAccessReader.java:391)
at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:392)
at org.apache.cassandra.utils.ByteBufferUtil.readWithShortLength(ByteBufferUtil.java:371)
at org.apache.cassandra.db.OnDiskAtom$Serializer.deserializeFromSSTable(OnDiskAtom.java:84)
at org.apache.cassandra.db.OnDiskAtom$Serializer.deserializeFromSSTable(OnDiskAtom.java:73)
at org.apache.cassandra.db.columniterator.IndexedSliceReader$IndexedBlockFetcher.getNextBlock(IndexedSliceReader.java:370)
at org.apache.cassandra.db.columniterator.IndexedSliceReader$IndexedBlockFetcher.fetchMoreData(IndexedSliceReader.java:325)
at org.apache.cassandra.db.columniterator.IndexedSliceReader.computeNext(IndexedSliceReader.java:151)
at org.apache.cassandra.db.columniterator.IndexedSliceReader.computeNext(IndexedSliceReader.java:48)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.apache.cassandra.db.columniterator.SSTableSliceIterator.hasNext(SSTableSliceIterator.java:90)
at org.apache.cassandra.db.filter.QueryFilter$2.getNext(QueryFilter.java:171)
at org.apache.cassandra.db.filter.QueryFilter$2.hasNext(QueryFilter.java:154)
at org.apache.cassandra.utils.MergeIterator$Candidate.advance(MergeIterator.java:143)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.advance(MergeIterator.java:122)
at org.apache.cassandra.utils.MergeIterator$ManyToOne.computeNext(MergeIterator.java:96)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.apache.cassandra.db.filter.SliceQueryFilter.collectReducedColumns(SliceQueryFilter.java:157)
at org.apache.cassandra.db.filter.QueryFilter.collateColumns(QueryFilter.java:136)
at org.apache.cassandra.db.filter.QueryFilter.collateOnDiskAtom(QueryFilter.java:84)
at org.apache.cassandra.db.CollationController.collectAllData(CollationController.java:293)
at org.apache.cassandra.db.CollationController.getTopLevelColumns(CollationController.java:65)
at org.apache.cassandra.db.ColumnFamilyStore.getTopLevelColumns(ColumnFamilyStore.java:1357)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1214)
at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1126)
at org.apache.cassandra.db.Table.getRow(Table.java:347)
at org.apache.cassandra.db.SliceFromReadCommand.getRow(SliceFromReadCommand.java:70)
at org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:1052)

CREATE TABLE velocity_app (
attribute text,
partner_code text,
app_name text,
type text,
"timestamp" bigint,
event text,
sequence_id text,
PRIMARY KEY ((attribute, partner_code, app_name, type), sequence_id)
) WITH CLUSTERING ORDER BY (sequence_id DESC);

insert into velocity_app(attribute, partner_code, app_name, type,timestamp,event,sequence_id)values('zqhxuyuan','tongdun','tongdun_app','login',1111111111,'{jsondata}','1111111111-1');
select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app';


cqlsh:system> select * from schema_columnfamilies where keyspace_name='forseti' and columnfamily_name='velocity_app';

keyspace_name | columnfamily_name | bloom_filter_fp_chance | caching | cf_id | comment | compaction_strategy_class | compaction_strategy_options | comparator | compression_parameters | default_time_to_live | default_validator | dropped_columns | gc_grace_seconds | is_dense | key_validator | local_read_repair_chance | max_compaction_threshold | max_index_interval | memtable_flush_period_in_ms | min_compaction_threshold | min_index_interval | read_repair_chance | speculative_retry | subcomparator | type
---------------+-------------------+------------------------+---------------------------------------------+--------------------------------------+---------+-----------------------------------------------------------------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------+----------------------+-------------------------------------------+-----------------+------------------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+--------------------+-----------------------------+--------------------------+--------------------+--------------------+-------------------+---------------+----------
forseti | velocity_app | 0.01 | {"keys":"ALL", "rows_per_partition":"NONE"} | 763248f0-8f88-11e6-a6b6-71d72bc0ba41 | | org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy | {} | org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type),org.apache.cassandra.db.marshal.UTF8Type) | {"sstable_compression":"org.apache.cassandra.io.compress.LZ4Compressor"} | 0 | org.apache.cassandra.db.marshal.BytesType | null | 864000 | False | org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type,org.apache.cassandra.db.marshal.UTF8Type) | 0.1 | 32 | 2048 | 0 | 4 | 128 | 0 | 99.0PERCENTILE | null | Standard

(1 rows)

cqlsh:system> select * FROM schema_columns where keyspace_name='forseti' and columnfamily_name='velocity_app';

keyspace_name | columnfamily_name | column_name | component_index | index_name | index_options | index_type | type | validator
---------------+-------------------+--------------+-----------------+------------+---------------+------------+----------------+----------------------------------------------------------------------------------------
forseti | velocity_app | app_name | 2 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | attribute | 0 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | event | 1 | null | null | null | regular | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | partner_code | 1 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type
forseti | velocity_app | sequence_id | 0 | null | null | null | clustering_key | org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type)
forseti | velocity_app | timestamp | 1 | null | null | null | regular | org.apache.cassandra.db.marshal.LongType
forseti | velocity_app | type | 3 | null | null | null | partition_key | org.apache.cassandra.db.marshal.UTF8Type

(7 rows)

比较器 类型
comparator CompositeType(ReversedType(UTF8Type),UTF8Type)
default_validator BytesType
key_validator CompositeType(UTF8Type,UTF8Type,UTF8Type,UTF8Type)

Write

String schema = "CREATE TABLE myKs.myTable (k int PRIMARY KEY,v1 text,v2 int)";
String insert = "INSERT INTO myKs.myTable (k, v1, v2) VALUES (?, ?, ?)";
CQLSSTableWriter writer = CQLSSTableWriter.builder()
.inDirectory("path/to/directory")
.forTable(schema)
.using(insert).build();
writer.addRow(0, "test1", 24);
writer.addRow(1, "test2", null);
writer.addRow(2, "test3", 42);
writer.close();

Gossip

http://blog.csdn.net/FireCoder/article/details/5707539

http://blog.csdn.net/zhangzhaokun/article/details/5859760

http://wiki.apache.org/cassandra/ArchitectureGossip

http://thelastpickle.com/blog/2011/12/15/Anatomy-of-a-Cassandra-Partition.html

Tracing on

public Message.Response execute(QueryState state) {
try {
UUID tracingId = null;
if (isTracingRequested()) {
tracingId = UUIDGen.getTimeUUID();
state.prepareTracingSession(tracingId);
}
if (state.traceNextQuery()) {
state.createTracingSession();
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.put("query", query);
Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
}
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
return response;
} finally {
Tracing.instance.stopSession(); //logger.trace("request complete");
}
}

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址