主要内容
- 1.jersey与http server集成
- 2.jersey基于jUnit的自动化http测试
- 3.sse的简单介绍
- 4.jersey对sse的支持,用curl模拟测试
- 5.用html+js验证sse的长连接信道
导航
jersey与http server集成
jersey基于jUnit自动化测试
send server events示例
html页面应用server sent events
大量参考
jersey与http server集成
1.cd sse
,然后使用maven命令download骨架,mvn archetype:generate
,输入过滤artifactId过滤字:jersey-quickstart-grizzly2
,选择一下版本,最新版2.26需要Java 8的支持,这里选择2.16就好了。然后输入自己的工程的几个信息:
<groupId>de.tao</groupId>
<artifactId>sse</artifactId>
<version>1.0</version>
可以看到使用的是grizzly2容器提供HTTP服务,类似jetty的一个东西。
2.了解一下download的文件:
(1)pom.xml 已经添加了grizzly2的依赖,已经配置了run main,内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.tao</groupId>
<artifactId>sse</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<name>sse</name>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey</groupId>
<artifactId>jersey-bom</artifactId>
<version>${jersey.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-grizzly2-http</artifactId>
</dependency>
<!-- uncomment this to get JSON support:
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-moxy</artifactId>
</dependency>
-->
<!--sent server envent时加上 -->
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<inherited>true</inherited>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>de.tao.App</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<jersey.version>2.16</jersey.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
dependencyManagement
的用法可以了解下,<dependency>
即不再配置具体的版本号。
(2)另外一个启动rest服务的主类
package de.tao;
import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.server.ResourceConfig;
import java.io.IOException;
import java.net.URI;
/**
* Main class.
*
*/
public class Main {
// Base URI the Grizzly HTTP server will listen on
public static final String BASE_URI = "http://localhost:8081/rest/myapp/";
/**
* Starts Grizzly HTTP server exposing JAX-RS resources defined in this application.
* @return Grizzly HTTP server.
*/
public static HttpServer startServer() {
// create a resource config that scans for JAX-RS resources and providers
// in de.tao package
final ResourceConfig rc = new ResourceConfig().packages("de.tao");
// create and start a new instance of grizzly http server
// exposing the Jersey application at BASE_URI
return GrizzlyHttpServerFactory.createHttpServer(URI.create(BASE_URI), rc);
}
/**
* Main method.
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
final HttpServer server = startServer();
System.out.println(String.format("Jersey app started with WADL available at "
+ "%sapplication.wadl\nHit enter to stop it...", BASE_URI));
System.in.read();
server.stop();
}
}
用代码ResourceConfig指定资源包
(3)以及提供rest服务类
package de.tao;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
/**
* Root resource (exposed at "myresource" path)
*/
@Path("myresource")
public class MyResource {
/**
* Method handling HTTP GET requests. The returned object will be sent
* to the client as "text/plain" media type.
*
* @return String that will be returned as a text/plain response.
*/
@GET
@Produces(MediaType.TEXT_PLAIN)
public String getIt() {
return "Got it!";
}
}
提供的内容非常的简单,只要访问/myresource,即返回默认的内容Got it!
。另外还有类似wsdl的描述文件:http://localhost:8080/rest/myapp/application.wadl
jersey基于jUnit自动化测试
打开测试类src/test/java/de/tao/MyResourceTest.java
package de.tao;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import org.glassfish.grizzly.http.server.HttpServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class MyResourceTest {
private HttpServer server;
private WebTarget target;
@Before
public void setUp() throws Exception {
// start the server
server = Main.startServer();
// create the client
Client c = ClientBuilder.newClient();
// uncomment the following line if you want to enable
// support for JSON in the client (you also have to uncomment
// dependency on jersey-media-json module in pom.xml and Main.startServer())
// --
// c.configuration().enable(new org.glassfish.jersey.media.json.JsonJaxbFeature());
target = c.target(Main.BASE_URI);
}
@After
public void tearDown() throws Exception {
server.stop();
}
/**
* Test to see that the message "Got it!" is sent in the response.
*/
@Test
public void testGetIt() {
String responseMsg = target.path("myresource").request().get(String.class);
assertEquals("Got it!", responseMsg);
}
}
可以看到jersey支持模拟客户端http请求
send server events示例
补充以下几个类:ServerSentEventsResource.java
单发消息,DomainResource.java
广播消息,App.java
启停服务,本文只介绍单播。
package de.tao;
import java.io.IOException;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;
/**
* @author Pavel Bucek (pavel.bucek at oracle.com)
*/
@Path("server-sent-events")
public class ServerSentEventsResource {
private static volatile EventOutput eventOutput = new EventOutput();
@GET
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getMessageQueue() {
return eventOutput;
}
@POST
public void addMessage(final String message) throws IOException {
final EventOutput localOutput = eventOutput;
if (localOutput != null) {
eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, message).build());
}
}
@DELETE
public void close() throws IOException {
final EventOutput localOutput = eventOutput;
if (localOutput != null) {
eventOutput.close();
}
ServerSentEventsResource.setEventOutput(new EventOutput());
}
@POST
@Path("domains/{id}")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput startDomain(@PathParam("id") final String id) {
final EventOutput seq = new EventOutput();
new Thread() {
public void run() {
try {
seq.write(new OutboundEvent.Builder().name("domain-progress")
.data(String.class, "starting domain " + id + " ...").build());
Thread.sleep(200);
seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "50%").build());
Thread.sleep(200);
seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "60%").build());
Thread.sleep(200);
seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "70%").build());
Thread.sleep(200);
seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "99%").build());
Thread.sleep(200);
seq.write(new OutboundEvent.Builder().name("domain-progress").data(String.class, "done").build());
seq.close();
} catch (final InterruptedException | IOException e) {
e.printStackTrace();
}
}
}.start();
return seq;
}
private static void setEventOutput(final EventOutput eventOutput) {
ServerSentEventsResource.eventOutput = eventOutput;
}
}
默认的get即通过getMessageQueue方法返回一个端到端的信道,这个request将一直在线监听,有点类似long-polling,关于几种机制的对比,参见这里,关于信道的更多理解,见这个示例
package de.tao;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
import org.glassfish.jersey.media.sse.SseBroadcaster;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ChunkedOutput;
/**
* @author Pavel Bucek (pavel.bucek at oracle.com)
*/
@Path("domain")
public class DomainResource {
private static final Map<Integer, Process> processes = new ConcurrentHashMap<Integer, Process>();
@Path("start")
@POST
public Response post(@DefaultValue("0") @QueryParam("testSources") int testSources) {
final Process process = new Process(testSources);
processes.put(process.getId(), process);
Executors.newSingleThreadExecutor().execute(process);
final URI processIdUri = UriBuilder.fromResource(DomainResource.class).path("process/{id}").build(process.getId());
return Response.created(processIdUri).build();
}
@Path("process/{id}")
@Produces(SseFeature.SERVER_SENT_EVENTS)
@GET
public EventOutput getProgress(@PathParam("id") int id,
@DefaultValue("false") @QueryParam("testSource") boolean testSource) {
final Process process = processes.get(id);
if (process != null) {
if (testSource) {
process.release();
}
final EventOutput eventOutput = new EventOutput();
process.getBroadcaster().add(eventOutput);
return eventOutput;
} else {
throw new NotFoundException();
}
}
static class Process implements Runnable {
private static final AtomicInteger counter = new AtomicInteger(0);
private final int id;
private final CountDownLatch latch;
private final SseBroadcaster broadcaster = new SseBroadcaster() {
@Override
public void onException(ChunkedOutput<OutboundEvent> outboundEventChunkedOutput, Exception exception) {
exception.printStackTrace();
}
};
public Process(int testReceivers) {
id = counter.incrementAndGet();
latch = testReceivers > 0 ? new CountDownLatch(testReceivers) : null;
}
public int getId() {
return id;
}
public SseBroadcaster getBroadcaster() {
return broadcaster;
}
public boolean release() {
if (latch == null) {
return false;
}
latch.countDown();
return true;
}
public void run() {
try {
if (latch != null) {
// wait for all test EventSources to be registered
latch.await(5, TimeUnit.SECONDS);
}
broadcaster.broadcast(
new OutboundEvent.Builder().name("domain-progress").data(String.class, "starting domain " + id + " ...")
.build());
broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "50%").build());
broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "60%").build());
broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "70%").build());
broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "99%").build());
broadcaster.broadcast(new OutboundEvent.Builder().name("domain-progress").data(String.class, "done").build());
broadcaster.closeAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
启动服务:
package de.tao;
import java.io.IOException;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.grizzly.http.server.HttpServer;
/**
* Server sent event example.
*
* @author Pavel Bucek (pavel.bucek at oracle.com)
*/
public class App {
private static final URI BASE_URI = URI.create("http://localhost:8080/rest/");
public static final String ROOT_PATH = "server-sent-events";
public static void main(String[] args) {
try {
System.out.println("\"Server-Sent Events\" Jersey Example App");
final ResourceConfig resourceConfig = new ResourceConfig(ServerSentEventsResource.class, SseFeature.class);
final HttpServer server = GrizzlyHttpServerFactory.createHttpServer(BASE_URI, resourceConfig, false);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
server.shutdownNow();
}
}));
server.start();
System.out.println(String.format("Application started.\nTry out %s%s\nStop the application using CTRL+C",
BASE_URI, ROOT_PATH));
Thread.currentThread().join();
} catch (IOException | InterruptedException ex) {
Logger.getLogger(App.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
测试:
- 启动
mvn clean compile exec:java
- 打开信道
curl -v http://localhost:8080/rest/server-sent-events -H "Content-Type: text/event-stream"
- 另开一个shell,往信道里推东西
curl -v -d "abc" http://localhost:8080/sse/rest/server-sent-events -H "Content-Type: text/event-stream"
,这时之前的shell将收到event: custom-message
,具体里面的内容不详,后文通过html + js来探究
html页面应用server sent events
在App.java里添加如下行:
server.getServerConfiguration().addHttpHandler(new CLStaticHttpHandler(new URLClassLoader(new URL[] {new URL("file:///Users/tao/dev/maventest/sse/htmls/")})), "/sse");
更多同类应用可参考:这里
新建htmls目录和index.html,内容如下:
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=gb2312" />
<script type="text/javascript">
//判断浏览器是否支持 EventSource
if (typeof (EventSource) !== "undefined") {
var source = new EventSource("rest/server-send-events");
// 当通往服务器的连接被打开
source.onopen = function(event) {
console.log("open!");
};
// 当接收到消息。只能是事件名称是 message
source.onmessage = function(event) {
console.log(event.data);
};
//可以是任意命名的事件名称
/*
source.addEventListener('message', function(event) {
console.log(event.data);
});
*/
// 当错误发生
source.onerror = function(event) {
console.log("error!");
};
} else {
console.log("Sorry, your browser does not support server-sent events...");
}
</script>
</head>
<body>
<p>
Jersey server sent events
<p>
</body>
</html>
访问:http://localhost:8080/sse
,打开html页面,developer模式下可以看到已经open信道,并会一直保持,两次使用curl推信息,页面将收到信息。多浏览器,包括curl访问这个rest服务,最后访问的将取代前者。
其它情况:
(1)不管在任何地方访问,EventOutput都被抢用,多用户将需要维护一个会话列表
(2)类变量EventOutput需要声明为static,否则发送不成功,推测是get和post请求返回了两个不同的实例
(3)浏览器突然关闭,addMessage报一个broken pip
的异常,此时EventOutput还未关闭,过后几秒中状态才更新,因为是静态变量,返回给客户端的eventOutput已经被中断,生产环境用的话要捕获和处理这种异常