XNIO

undertow 是以XNIO为实现核心。XNIO有两个核心概念:

Channel

Channel,是传输管道的抽象概念,在NIO的Channel上进行的扩展加强,使用ChannelListener API进行事件通知。在创建Channel时,就赋予IO线程,用于执行所有的ChannelListener回调方法。

IOWorker

区分IO线程和工作线程,创建一个工作线程池可以用来执行阻塞任务。一般情况下,非阻塞的Handler由IO线程执行,而阻塞任务比如Servlet则被调度到工作线程池执行。这样就很好的区分了阻塞和非阻塞的两种情形。

  • WORKER_IO_THREADS, IO thread处理非阻塞任务,要保证不做阻塞操作,因为很多连接同时用到这类线程,类似于nodejs中的loop,这个线程只要有任务就去执行,实际配置时每个CPU一个线程比较好。
  • WORKER_TASK_CORE_THREADS,用于执行阻塞任务,从线程池中获得,任务完成后返回到线程池中。因为不同应用对应的服务器负载不同,所以不易给出具体数值,一般建议每个CPU core设置10个。

我们知道NIO的基本要求是不阻塞当前线程的执行,对于非阻塞请求的结果,可以用两种方式获得:一种是对于请求很快返回一个引用(如JDK中Future,XNIO中称为IoFuture,其中很多方法是类似的),过一段时间再查询结果;还有一种是当结果就绪时,调用事先注册的回调方法来通知(如NIO2的CompletionHandler,XNIO的ChannelListener)。显而易见后者效率更高一些,避免了数据未就绪情景下的无用处理过程。但JDK7之前无法将函数作为方法参数,所以只能用Java的匿名内部类来模拟函数式方法,造成代码嵌套层次过多,难以理解和维护,所以Netty和XNIO这样的框架通过调度方法调用过程,简化了编程工作。

XNIO和Netty都对ByteBuffer进行池化管理,简单来说就是开发者在程序开始时就计划好读写缓存区大小,统一分配好放到池中,Xnio中有Pool和Pooled接口用来管理池化缓存区。开发过高并发应用就知道,JVM GC经常出现并难以控制是很头疼的问题。我们通常在接收网络数据时,往往简单的new出一块数据区,填充,解析,使用,最后丢弃,这种方法随着大量的数据读入,必然造成GC反复出现。重用缓存区就可以在这个方面解决一部分问题。

和Netty的ChannelHandler不同,XNIO对应的ChannelListener只有一个方法handleEvent(),也就意味着所有的事件都要经由这个方法。在实际实行过程中,会进行若干状态机的转变,比如在服务器端,开始时accept状态就绪,当连接建立后转变为可读或者可写状态。请参见下面的例子。

除了ChannelIOWorker 两个重要的基础,XNIO还提供了SSL支持

undertow core

Listeners

目前undertow中支持的Listner类型主要有

  • HTTP
  • HTTPS
  • AJP
  • HTTP2

Handler

undertow原生提供了io.undertow.server.HttpHandler,接口定义比较简单:

public interface HttpHandler {
    void handleRequest(HttpServerExchange exchange) throws Exception;
}

undertow中并没有pipeline的概念,但是可以在构建hanlder时指定next,如下代码:

public class SetHeaderHandler implements HttpHandler {
    private final HttpString header;
    private final String value;
    private final HttpHandler next;
    public SetHeaderHandler(final HttpHandler next, final String header, final String value) {
        this.next = next;
        this.value = value;
        this.header = new HttpString(header);
    }
    @Override
    public void handleRequest(final HttpServerExchange exchange) throws Exception {
        exchange.getResponseHeaders().put(header, value);
        next.handleRequest(exchange);
    }
}

或者类似下面的方式,只做自身逻辑的处理,不做传递:

public class FilterHandler implements HttpHandler {

    private final Map<DispatcherType, List<ManagedFilter>> filters;
    private final Map<DispatcherType, Boolean> asyncSupported;
    private final boolean allowNonStandardWrappers;

    private final HttpHandler next;

    public FilterHandler(final Map<DispatcherType, List<ManagedFilter>> filters, final boolean allowNonStandardWrappers, final HttpHandler next) {
        this.allowNonStandardWrappers = allowNonStandardWrappers;
        this.next = next;
        this.filters = new EnumMap<>(filters);
        Map<DispatcherType, Boolean> asyncSupported = new EnumMap<>(DispatcherType.class);
        for(Map.Entry<DispatcherType, List<ManagedFilter>> entry : filters.entrySet()) {
            boolean supported = true;
            for(ManagedFilter i : entry.getValue()) {
                if(!i.getFilterInfo().isAsyncSupported()) {
                    supported = false;
                    break;
                }
            }
            asyncSupported.put(entry.getKey(), supported);
        }
        this.asyncSupported = asyncSupported;
    }

    @Override
    public void handleRequest(final HttpServerExchange exchange) throws Exception {
        final ServletRequestContext servletRequestContext = exchange.getAttachment(ServletRequestContext.ATTACHMENT_KEY);
        ServletRequest request = servletRequestContext.getServletRequest();
        ServletResponse response = servletRequestContext.getServletResponse();
        DispatcherType dispatcher = servletRequestContext.getDispatcherType();
        Boolean supported = asyncSupported.get(dispatcher);
        if(supported != null && ! supported) {
            exchange.putAttachment(AsyncContextImpl.ASYNC_SUPPORTED, false    );
        }

        final List<ManagedFilter> filters = this.filters.get(dispatcher);
        if(filters == null) {
            next.handleRequest(exchange);
        } else {
            final FilterChainImpl filterChain = new FilterChainImpl(exchange, filters, next, allowNonStandardWrappers);
            filterChain.doFilter(request, response);
        }
    }

组装服务器

1.创建XNIO Workder
2.创建 XNIO SSL实例
3.Create an instance of the relevant Undertow listener class
4.Open a server socket using XNIO and set its accept listener

Xnio xnio = Xnio.getInstance();
XnioWorker worker = xnio.createWorker(OptionMap.builder()
        .set(Options.WORKER_IO_THREADS, ioThreads)
        .set(Options.WORKER_TASK_CORE_THREADS, workerThreads)
        .set(Options.WORKER_TASK_MAX_THREADS, workerThreads)
        .set(Options.TCP_NODELAY, true)
        .getMap());
OptionMap socketOptions = OptionMap.builder()
        .set(Options.WORKER_IO_THREADS, ioThreads)
        .set(Options.TCP_NODELAY, true)
        .set(Options.REUSE_ADDRESSES, true)
        .getMap();
Pool<ByteBuffer> buffers = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR,bufferSize, bufferSize * buffersPerRegion);
if (listener.type == ListenerType.AJP) {
    AjpOpenListener openListener = new AjpOpenListener(buffers, serverOptions, bufferSize);
    openListener.setRootHandler(rootHandler);
    ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
    AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), acceptListener, socketOptions);
    server.resumeAccepts();
} else if (listener.type == ListenerType.HTTP) {
    HttpOpenListener openListener = new HttpOpenListener(buffers, OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).addAll(serverOptions).getMap(), bufferSize);
    openListener.setRootHandler(rootHandler);
    ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
    AcceptingChannel<? extends StreamConnection> server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), acceptListener, socketOptions);
    server.resumeAccepts();
} else if (listener.type == ListenerType.HTTPS){
    HttpOpenListener openListener = new HttpOpenListener(buffers, OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).addAll(serverOptions).getMap(), bufferSize);
    openListener.setRootHandler(rootHandler);
    ChannelListener<AcceptingChannel<StreamConnection>> acceptListener = ChannelListeners.openListenerAdapter(openListener);
    XnioSsl xnioSsl;
    if(listener.sslContext != null) {
        xnioSsl = new JsseXnioSsl(xnio, OptionMap.create(Options.USE_DIRECT_BUFFERS, true), listener.sslContext);
    } else {
        xnioSsl = xnio.getSslProvider(listener.keyManagers, listener.trustManagers, OptionMap.create(Options.USE_DIRECT_BUFFERS, true));
    }
    AcceptingChannel <SslConnection> sslServer = xnioSsl.createSslConnectionServer(worker, new InetSocketAddress(Inet4Address.getByName(listener.host), listener.port), (ChannelListener) acceptListener, socketOptions);
    sslServer.resumeAccepts();
}

undertow servlet

类似与jetty,undertow 也可以部署采用部署war包的方式启动项目:

DeploymentInfo servletBuilder = Servlets.deployment()
        .setClassLoader(ServletServer.class.getClassLoader())
        .setContextPath("/myapp")
        .setDeploymentName("test.war")
        .addServlets(
                Servlets.servlet("MessageServlet", MessageServlet.class)
                        .addInitParam("message", "Hello World")
                        .addMapping("/*"),
                Servlets.servlet("MyServlet", MessageServlet.class)
                        .addInitParam("message", "MyServlet")
                        .addMapping("/myservlet"));

DeploymentManager manager = Servlets.defaultContainer().addDeployment(servletBuilder);
manager.deploy();
PathHandler path = Handlers.path(Handlers.redirect("/myapp"))
        .addPrefixPath("/myapp", manager.start());

Undertow server = Undertow.builder()
        .addHttpListener(8080, "localhost")
        .setHandler(path)
        .build();
server.start();

Refernces

Undertow服务器基础分析 - 概述
Undertow服务器基础分析 - XNIO
Undertow服务器基础分析
undertow cookdoc