主要内容

  • 1.jersey与http server集成
  • 2.jersey基于jUnit的自动化http测试
  • 3.sse的简单介绍
  • 4.jersey对sse的支持,用curl模拟测试
  • 5.用html+js验证sse的长连接信道

导航

jersey与http server集成
jersey基于jUnit自动化测试
send server events示例
html页面应用server sent events

大量参考

jersey与http server集成

1.cd sse,然后使用maven命令download骨架,mvn archetype:generate,输入过滤artifactId过滤字:jersey-quickstart-grizzly2,选择一下版本,最新版2.26需要Java 8的支持,这里选择2.16就好了。然后输入自己的工程的几个信息:

<groupId>de.tao</groupId>
<artifactId>sse</artifactId>
<version>1.0</version>

可以看到使用的是grizzly2容器提供HTTP服务,类似jetty的一个东西。

2.了解一下download的文件:
(1)pom.xml 已经添加了grizzly2的依赖,已经配置了run main,内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>de.tao</groupId>
    <artifactId>sse</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>
    <name>sse</name>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.glassfish.jersey</groupId>
                <artifactId>jersey-bom</artifactId>
                <version>${jersey.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.glassfish.jersey.containers</groupId>
            <artifactId>jersey-container-grizzly2-http</artifactId>
        </dependency>
        <!-- uncomment this to get JSON support:
         <dependency>
            <groupId>org.glassfish.jersey.media</groupId>
            <artifactId>jersey-media-moxy</artifactId>
        </dependency>
        --> 
        <!--sent server envent时加上 -->
        <dependency>
            <groupId>org.glassfish.jersey.media</groupId>
            <artifactId>jersey-media-sse</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <inherited>true</inherited>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>java</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <mainClass>de.tao.App</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <properties>
        <jersey.version>2.16</jersey.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
</project>

dependencyManagement的用法可以了解下,<dependency>即不再配置具体的版本号。

(2)另外一个启动rest服务的主类

package de.tao;

import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.server.ResourceConfig;

import java.io.IOException;
import java.net.URI;

/**
 * Main class.
 *
 */
public class Main {
    // Base URI the Grizzly HTTP server will listen on
    public static final String BASE_URI = "http://localhost:8081/rest/myapp/";

    /**
     * Starts Grizzly HTTP server exposing JAX-RS resources defined in this application.
     * @return Grizzly HTTP server.
     */
    public static HttpServer startServer() {
        // create a resource config that scans for JAX-RS resources and providers
        // in de.tao package
        final ResourceConfig rc = new ResourceConfig().packages("de.tao");

        // create and start a new instance of grizzly http server
        // exposing the Jersey application at BASE_URI
        return GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc);
    }

    /**
     * Main method.
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        final HttpServer server = startServer();
        System.out.println(String.format("Jersey app started with WADL available at "
                + "%sapplication.wadl\nHit enter to stop it...", BASE_URI));
        System.in.read();
        server.stop();
    }
}

用代码ResourceConfig指定资源包

(3)以及提供rest服务类

package de.tao;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

/**
 * Root resource (exposed at "myresource" path)
 */
@Path("myresource")
public class MyResource {

    /**
     * Method handling HTTP GET requests. The returned object will be sent
     * to the client as "text/plain" media type.
     *
     * @return String that will be returned as a text/plain response.
     */
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String getIt() {
        return "Got it!";
    }
}

提供的内容非常的简单,只要访问/myresource,即返回默认的内容Got it!。另外还有类似wsdl的描述文件:http://localhost:8080/rest/myapp/application.wadl

jersey基于jUnit自动化测试

打开测试类src/test/java/de/tao/MyResourceTest.java

package de.tao;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;

import org.glassfish.grizzly.http.server.HttpServer;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;

public class MyResourceTest {

    private HttpServer server;
    private WebTarget target;

    @Before
    public void setUp() throws Exception {
        // start the server
        server = Main.startServer();
        // create the client
        Client c = ClientBuilder.newClient();

        // uncomment the following line if you want to enable
        // support for JSON in the client (you also have to uncomment
        // dependency on jersey-media-json module in pom.xml and Main.startServer())
        // --
        // c.configuration().enable(new org.glassfish.jersey.media.json.JsonJaxbFeature());

        target = c.target(Main.BASE_URI);
    }

    @After
    public void tearDown() throws Exception {
        server.stop();
    }

    /**
     * Test to see that the message "Got it!" is sent in the response.
     */
    @Test
    public void testGetIt() {
        String responseMsg = target.path("myresource").request().get(String.class);
        assertEquals("Got it!", responseMsg);
    }
}

可以看到jersey支持模拟客户端http请求

send server events示例

补充以下几个类:ServerSentEventsResource.java单发消息,DomainResource.java广播消息,App.java启停服务,本文只介绍单播。

package de.tao;

import java.io.IOException;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;

import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;

/**
 * @author Pavel Bucek (pavel.bucek at oracle.com)
 */
@Path("server-sent-events")
public class ServerSentEventsResource {

    private static volatile EventOutput eventOutput = new EventOutput();

    @GET
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput getMessageQueue() {
        return eventOutput;
    }

    @POST
    public void addMessage(final String message) throws IOException {
        final EventOutput localOutput = eventOutput;
        if (localOutput != null) {
            eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, message).build());
        }
    }

    @DELETE
    public void close() throws IOException {
        final EventOutput localOutput = eventOutput;
        if (localOutput != null) {
            eventOutput.close();
        }
        ServerSentEventsResource.setEventOutput(new EventOutput());
    }

    @POST
    @Path("domains/{id}")
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput startDomain(@PathParam("id") final String id) {
        final EventOutput seq = new EventOutput();

        new Thread() {
            public void run() {
                try {
                    seq.write(new OutboundEvent.Builder().name("domain-progress")
                            .data(String.class, "starting domain " + id + " ...").build());
                    Thread.sleep(200);
                    seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "50%").build());
                    Thread.sleep(200);
                    seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "60%").build());
                    Thread.sleep(200);
                    seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "70%").build());
                    Thread.sleep(200);
                    seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "99%").build());
                    Thread.sleep(200);
                    seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "done").build());
                    seq.close();

                } catch (final InterruptedException | IOException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        return seq;
    }

    private static void setEventOutput(final EventOutput eventOutput) {
        ServerSentEventsResource.eventOutput = eventOutput;
    }
}

默认的get即通过getMessageQueue方法返回一个端到端的信道,这个request将一直在线监听,有点类似long-polling,关于几种机制的对比,参见这里,关于信道的更多理解,见这个示例

package de.tao;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;

import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ChunkedOutput;

/**
 * @author Pavel Bucek (pavel.bucek at oracle.com)
 */
@Path("domain")
public class DomainResource {

    private static final Map<Integer, Process> processes = new ConcurrentHashMap<Integer, Process>();

    @Path("start")
    @POST
    public Response post(@DefaultValue("0") @QueryParam("testSources") int testSources) {
        final Process process = new Process(testSources);
        processes.put(process.getId(), process);

        Executors.newSingleThreadExecutor().execute(process);

        final URI processIdUri = UriBuilder.fromResource(DomainResource.class).path("process/{id}").build(process.getId());
        return Response.created(processIdUri).build();
    }

    @Path("process/{id}")
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    @GET
    public EventOutput getProgress(@PathParam("id") int id,
                                   @DefaultValue("false") @QueryParam("testSource") boolean testSource) {
        final Process process = processes.get(id);

        if (process != null) {
            if (testSource) {
                process.release();
            }
            final EventOutput eventOutput = new EventOutput();
            process.getBroadcaster().add(eventOutput);
            return eventOutput;
        } else {
            throw new NotFoundException();
        }
    }

    static class Process implements Runnable {

        private static final AtomicInteger counter = new AtomicInteger(0);

        private final int id;
        private final CountDownLatch latch;
        private final SseBroadcaster broadcaster = new SseBroadcaster() {
            @Override
            public void onException(ChunkedOutput<OutboundEvent> outboundEventChunkedOutput, Exception exception) {
                exception.printStackTrace();
            }
        };

        public Process(int testReceivers) {
            id = counter.incrementAndGet();
            latch = testReceivers > 0 ? new CountDownLatch(testReceivers) : null;
        }

        public int getId() {
            return id;
        }

        public SseBroadcaster getBroadcaster() {
            return broadcaster;
        }

        public boolean release() {
            if (latch == null) {
                return false;
            }

            latch.countDown();
            return true;
        }

        public void run() {
            try {
                if (latch != null) {
                    // wait for all test EventSources to be registered
                    latch.await(5, TimeUnit.SECONDS);
                }

                broadcaster.broadcast(
                        new OutboundEvent.Builder().name("domain-progress").data(String.class, "starting domain " + id + " ...")
                                .build());
                broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "50%").build());
                broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "60%").build());
                broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "70%").build());
                broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "99%").build());
                broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "done").build());
                broadcaster.closeAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

启动服务:

package de.tao;


import java.io.IOException;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ResourceConfig;

import org.glassfish.grizzly.http.server.HttpServer;

/**
 * Server sent event example.
 *
 * @author Pavel Bucek (pavel.bucek at oracle.com)
 */
public class App {

    private static final URI BASE_URI = URI.create("http://localhost:8080/rest/");
    public static final String ROOT_PATH = "server-sent-events";

    public static void main(String[] args) {
        try {
            System.out.println("\"Server-Sent Events\" Jersey Example App");

            final ResourceConfig resourceConfig = new ResourceConfig(ServerSentEventsResource.class, SseFeature.class);

            final HttpServer server = GrizzlyHttpServerFactory.createHttpServer(BASE_URI, resourceConfig, false);
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                @Override
                public void run() {
                    server.shutdownNow();
                }
            }));
            server.start();

            System.out.println(String.format("Application started.\nTry out %s%s\nStop the application using CTRL+C",
                    BASE_URI, ROOT_PATH));

            Thread.currentThread().join();
        } catch (IOException | InterruptedException ex) {
            Logger.getLogger(App.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

测试:

  • 启动mvn clean compile exec:java
  • 打开信道curl -v http://localhost:8080/rest/server-sent-events -H "Content-Type: text/event-stream"
  • 另开一个shell,往信道里推东西curl -v -d "abc" http://localhost:8080/sse/rest/server-sent-events -H "Content-Type: text/event-stream",这时之前的shell将收到event: custom-message,具体里面的内容不详,后文通过html + js来探究

html页面应用server sent events

在App.java里添加如下行:

server.getServerConfiguration().addHttpHandler(new CLStaticHttpHandler(new URLClassLoader(new URL[] {new URL("file:///Users/tao/dev/maventest/sse/htmls/")})), "/sse");

更多同类应用可参考:这里

新建htmls目录和index.html,内容如下:

<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=gb2312" />
    <script type="text/javascript">
    //判断浏览器是否支持 EventSource
    if (typeof (EventSource) !== "undefined") {
        var source = new EventSource("rest/server-send-events");
            // 当通往服务器的连接被打开
            source.onopen = function(event) {
            console.log("open!");
        };
        // 当接收到消息。只能是事件名称是 message
        source.onmessage = function(event) {
            console.log(event.data);
        };
        //可以是任意命名的事件名称
        /*
        source.addEventListener('message', function(event) {
            console.log(event.data);
        });
        */
        // 当错误发生
        source.onerror = function(event) {
            console.log("error!");
        };
    } else {
        console.log("Sorry, your browser does not support server-sent events...");
    }
    </script>
</head>
<body>
    <p>
        Jersey server sent events
    <p>
</body>
</html>

访问:http://localhost:8080/sse,打开html页面,developer模式下可以看到已经open信道,并会一直保持,两次使用curl推信息,页面将收到信息。多浏览器,包括curl访问这个rest服务,最后访问的将取代前者。

其它情况:
(1)不管在任何地方访问,EventOutput都被抢用,多用户将需要维护一个会话列表
(2)类变量EventOutput需要声明为static,否则发送不成功,推测是get和post请求返回了两个不同的实例
(3)浏览器突然关闭,addMessage报一个broken pip的异常,此时EventOutput还未关闭,过后几秒中状态才更新,因为是静态变量,返回给客户端的eventOutput已经被中断,生产环境用的话要捕获和处理这种异常