memcache链接超时问题
spymemcache的Timed out waiting for operation 问题探究
主要原因:在想memcache客户端添加获取数据时,主要spymemcache是基于nio异步获取的,所以当获取数据时会把任务添加任务队列等待执行(如图1),同时spymemcache也会做数据获取的链接超时验证,默认时间2500毫秒,如果超过2500就会报异常(如图2)。所以归根到底还是从缓存获取数据任务太多,后面添加的数据需要等待前面的任务完成才可以继续执行,但后面任务有时间限制,所以才会出现这个问题。
@Override
public <T> T get(String key, Transcoder<T> tc) {
try {
return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
if(e.getCause() instanceof CancellationException) {
throw (CancellationException) e.getCause();
} else {
throw new RuntimeException("Exception waiting for value", e);
}
} catch (TimeoutException e) {
throw new OperationTimeoutException("Timeout waiting for value: "
+ buildTimeoutMessage(operationTimeout, TimeUnit.MILLISECONDS), e);
}
}
图0
图一
@Override
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key,
executorService);
Operation op = opFact.get(key, new GetOperation.Callback() {
private Future<T> val;
@Override
public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}
@Override
public void gotData(String k, int flags, byte[] data) {
assert key.equals(k) : "Wrong key returned";
val =
tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
}
@Override
public void complete() {
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op);
return rv;
}
图2
public T get(long duration, TimeUnit units) throws InterruptedException,
TimeoutException, ExecutionException {
if (!latch.await(duration, units)) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
MemcachedConnection.opTimedOut(op);
if (op != null) { // op can be null on a flush
op.timeOut();
}
throw new CheckedOperationTimeoutException(
"Timed out waiting for operation", op);
} else {
// continuous timeout counter will be reset
MemcachedConnection.opSucceeded(op);
}
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (isCancelled()) {
throw new ExecutionException(new CancellationException("Cancelled"));
}
if (op != null && op.isTimedOut()) {
throw new ExecutionException(new CheckedOperationTimeoutException(
"Operation timed out.", op));
}
/* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */
return objRef.get();
}
=====================================================================
xmemcache
xmemcache的get方法源码最终还是访问这个fetch0方法,
从源码分析 xmemcache 也会 报错Timed out,大致原理和spymemcache一致
图一
@SuppressWarnings("unchecked")
private final <T> Object fetch0(final String key, final byte[] keyBytes,
final CommandType cmdType, final long timeout,
Transcoder<T> transcoder) throws InterruptedException,
TimeoutException, MemcachedException, MemcachedException {
final Command command = this.commandFactory.createGetCommand(key,
keyBytes, cmdType, this.transcoder);
this.latchWait(command, timeout, this.sendCommand(command));
command.getIoBuffer().free(); // free buffer
this.checkException(command);
CachedData data = (CachedData) command.getResult();
if (data == null) {
return null;
}
if (transcoder == null) {
transcoder = this.transcoder;
}
if (cmdType == CommandType.GETS_ONE) {
return new GetsResponse<T>(data.getCas(), transcoder.decode(data));
} else {
return transcoder.decode(data);
}
}
图2
private void latchWait(final Command cmd, final long timeout,
final Session session) throws InterruptedException,
TimeoutException {
if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
AtomicInteger counter = this.getContinuousTimeoutCounter(session);
// reset counter.
if (counter.get() > 0) {
counter.set(0);
}
} else {
cmd.cancel();
AtomicInteger counter = this.getContinuousTimeoutCounter(session);
if (counter.incrementAndGet() > this.timeoutExceptionThreshold) {
log.warn(session
+ " exceeded continuous timeout threshold,we will close it.");
try {
// reset counter.
counter.set(0);
session.close();
} catch (Exception e) {
// ignore it.
}
}
throw new TimeoutException(
"Timed out("
+ timeout
+ " milliseconds) waiting for operation while connected to "
+ session);
}
}
3个人解决思路
1:控制memcache任务队列
2:补做异常
3:延长链接超时的设置。(但是对实际功能整体执行时间就加长)
4:返回null
- 上一篇:没有了
- 下一篇:没有了