入门客AI创业平台(我带你入门,你带我飞行)
博文笔记

memcache链接超时问题

创建时间:2014-10-20 投稿人: 浏览次数:9771



spymemcache的Timed out waiting for operation 问题探究

主要原因:在想memcache客户端添加获取数据时,主要spymemcache是基于nio异步获取的,所以当获取数据时会把任务添加任务队列等待执行(如图1),同时spymemcache也会做数据获取的链接超时验证,默认时间2500毫秒,如果超过2500就会报异常(如图2)。所以归根到底还是从缓存获取数据任务太多,后面添加的数据需要等待前面的任务完成才可以继续执行,但后面任务有时间限制,所以才会出现这个问题。


@Override
  public <T> T get(String key, Transcoder<T> tc) {
    try {
      return asyncGet(key, tc).get(operationTimeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      throw new RuntimeException("Interrupted waiting for value", e);
    } catch (ExecutionException e) {
      if(e.getCause() instanceof CancellationException) {
        throw (CancellationException) e.getCause();
      } else {
        throw new RuntimeException("Exception waiting for value", e);
      }
    } catch (TimeoutException e) {
      throw new OperationTimeoutException("Timeout waiting for value: "
        + buildTimeoutMessage(operationTimeout, TimeUnit.MILLISECONDS), e);
    }
  }

图0


图一

  @Override
  public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {


    final CountDownLatch latch = new CountDownLatch(1);
    final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key,
      executorService);
    Operation op = opFact.get(key, new GetOperation.Callback() {
      private Future<T> val;


      @Override
      public void receivedStatus(OperationStatus status) {
        rv.set(val, status);
      }


      @Override
      public void gotData(String k, int flags, byte[] data) {
        assert key.equals(k) : "Wrong key returned";
        val =
            tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
      }


      @Override
      public void complete() {
        latch.countDown();
        rv.signalComplete();
      }
    });
    rv.setOperation(op);
    mconn.enqueueOperation(key, op);
    return rv;
  }

图2

  public T get(long duration, TimeUnit units) throws InterruptedException,
      TimeoutException, ExecutionException {
    if (!latch.await(duration, units)) {
      // whenever timeout occurs, continuous timeout counter will increase by 1.
      MemcachedConnection.opTimedOut(op);
      if (op != null) { // op can be null on a flush
        op.timeOut();
      }
      throw new CheckedOperationTimeoutException(
          "Timed out waiting for operation", op);
    } else {
      // continuous timeout counter will be reset
      MemcachedConnection.opSucceeded(op);
    }
    if (op != null && op.hasErrored()) {
      throw new ExecutionException(op.getException());
    }
    if (isCancelled()) {
      throw new ExecutionException(new CancellationException("Cancelled"));
    }
    if (op != null && op.isTimedOut()) {
      throw new ExecutionException(new CheckedOperationTimeoutException(
          "Operation timed out.", op));
    }


    /* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */


    return objRef.get();
  }



=====================================================================

xmemcache


xmemcache的get方法源码最终还是访问这个fetch0方法,

从源码分析 xmemcache 也会 报错Timed out,大致原理和spymemcache一致



图一

@SuppressWarnings("unchecked")
private final <T> Object fetch0(final String key, final byte[] keyBytes,
final CommandType cmdType, final long timeout,
Transcoder<T> transcoder) throws InterruptedException,
TimeoutException, MemcachedException, MemcachedException {
final Command command = this.commandFactory.createGetCommand(key,
keyBytes, cmdType, this.transcoder);
this.latchWait(command, timeout, this.sendCommand(command));
command.getIoBuffer().free(); // free buffer
this.checkException(command);
CachedData data = (CachedData) command.getResult();
if (data == null) {
return null;
}
if (transcoder == null) {
transcoder = this.transcoder;
}
if (cmdType == CommandType.GETS_ONE) {
return new GetsResponse<T>(data.getCas(), transcoder.decode(data));
} else {
return transcoder.decode(data);
}
}



图2

private void latchWait(final Command cmd, final long timeout,
final Session session) throws InterruptedException,
TimeoutException {
if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
AtomicInteger counter = this.getContinuousTimeoutCounter(session);
// reset counter.
if (counter.get() > 0) {
counter.set(0);
}
} else {
cmd.cancel();
AtomicInteger counter = this.getContinuousTimeoutCounter(session);
if (counter.incrementAndGet() > this.timeoutExceptionThreshold) {
log.warn(session
+ " exceeded continuous timeout threshold,we will close it.");
try {
// reset counter.
counter.set(0);
session.close();
} catch (Exception e) {
// ignore it.
}
}
throw new TimeoutException(
"Timed out("
+ timeout
+ " milliseconds) waiting for operation while connected to "
+ session);
}
}


3个人解决思路

1:控制memcache任务队列

2:补做异常

3:延长链接超时的设置。(但是对实际功能整体执行时间就加长)

4:返回null


声明:该文观点仅代表作者本人,入门客AI创业平台信息发布平台仅提供信息存储空间服务,如有疑问请联系rumenke@qq.com。
  • 上一篇:没有了
  • 下一篇:没有了
未上传头像