Featured Post

Applying Email Validation to a JavaFX TextField Using Binding

This example uses the same controller as in a previous post but adds a use case to support email validation.  A Commons Validator object is ...

Sunday, March 13, 2016

A JavaFX Netty WebSocket Client

Netty is a networking framework that provides out-of-the-box support for WebSockets.  WebSockets enable bi-directional communication between a client and a server and improve upon the one-way HTTP protocol for performance and resource management.  This post presents a JavaFX client using Netty's WebSockets classes to connect to a server also implemented with NettyWebSockets.

For more information about WebSockets, see the standard or the book "Netty in Action".


The code in this post and on GitHub is based on examples in Chapters 2 and 12 of "Netty in Action".  Some of the client-side code was also taken from the WebSocketClient example in the Netty project.

All of the code in this post can be found on GitHub here.

This video shows three JavaFX WebSocket clients connecting to a WebSocket server.


The demonstration executes the following sequence of events.

1. The server is started and listens for HTTP traffic on port 8080.
2. The client connects to port 8080.  The server switches over to the WebSocket protocol via an upgrade mechanism (see the WebSocket spec).
3. The user enters text and presses the Send button.  A WebSocket frame is formed from the text and sent to the server with the saved-off connection from Step 2.
4. The server receives the text, transforms it (capitalize), then send the transformed result back to the client.
5. The client receives the transformed text, sets a JavaFX property.  JavaFX Binding automatically updates the UI component.
6. The client disconnects through pressing the Disconnect button or exiting the app.  The client-side networking resources are freed.

Design


This UML shows the classes involved with the server and with the client.

UML of WebSocket Echo Example
The server uses a main class, EchoServerWS.  A Netty Pipeline is constructed that uses a custom HTTP request handler (EchoServerHttpRequestHandler) and a custom WebSocket frame handler (EchoServerWSHandler).

The client uses a main class and a JavaFX controller class: EchoClientWS and EchoClientControllerWS.  A Netty Pipeline is also constructed for the client that uses a custom WebSocket frame handler, EchoClientHandlerWS.

Code - Server Process


In the main(), the server object is created, started and a destructor is registered with a Java shutdown hook.

public static void main(String[] args) throws Exception {
 if( args.length != 1 ) {
  System.err.println("usage: java EchoServerWS port");
  System.exit(1);
 }
 
 int port = Integer.parseInt(args[0]);
 EchoServerWS endpoint = new EchoServerWS();
 ChannelFuture future = endpoint.start(new InetSocketAddress(port));
 Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
   endpoint.destroy();
  }   
 });
 future.channel().closeFuture().syncUninterruptibly();
}

The start() method bootstraps, binds, and creates the Netty Pipeline.  Several fields are used to provide arguments to the later destroy() call.

private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;

public ChannelFuture start(InetSocketAddress address) {

 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap
  .group(group)
  .channel(NioServerSocketChannel.class)
  .childHandler(createInitializer());
 
 ChannelFuture future = bootstrap.bind(address);
 future.syncUninterruptibly();
 channel = future.channel();

 return future;
}

This is the very important Netty Pipeline code.

protected ChannelInitializer<Channel> createInitializer() {

 return new ChannelInitializer<Channel>() {

  @Override
  protected void initChannel(Channel ch) throws Exception {
   ChannelPipeline p = ch.pipeline();
   p.addLast(new HttpServerCodec() );
   p.addLast(new ChunkedWriteHandler());
   p.addLast(new HttpObjectAggregator(64 * 1024));
   p.addLast(new EchoServerHttpRequestHandler("/ws"));
   p.addLast(new WebSocketServerProtocolHandler("/ws"));
   p.addLast(new EchoServerWSHandler());
  }
 };
}

The Pipeline is created with a mixture of HTTP-handling objects and WebSocket-handling objects.  A design goal of WebSockets is compatibility, so HTTP -- widely known and deployed -- is used as a known reference point to start the WebSockets communication.  Every browser and client app speaks HTTP, but not everyone is WebSocket-capable yet.

Also, this example was written with web apps in mind.  If you issue an HTTP request to the root URL http://localhost:8080, you'll get back a web page delivered through Netty.  In this respect, the example works as a plain web server capable of serving up a single index.html file.

The initial contact begins with the HTTP handlers which appear first in the Pipeline: HttpServerCodec, ChunckedWriteHandler, HttpObjectAggregator, EchoSeerverHttpRequestHandler.   Requests coming in to the URL /ws will trigger the removal of the HTTP handlers by the WebSocketServerProtocolHandler.  At this point, the Pipeline consists only of the WebSocket handlers: WebSocketServerProtocolHandler, EchoServerWSHandler.

Here is the destructor code.

public void destroy() {
 if( channel != null ) {
  channel.close();
 }
 channelGroup.close();
 group.shutdownGracefully();
}

Code - HTTP Handler


EchoServerHttpRequestHandler consists mostly of code that is used by web apps.  As mentioned earlier, if you visit the root URL, you will get an index.html file from the file system.  This file contains browser-based WebSocket client code that is not presented in this post.  It is available on GitHub, though.

public class EchoServerHttpRequestHandler 
 extends SimpleChannelInboundHandler<FullHttpRequest> {

private Logger logger = LoggerFactory.getLogger( EchoServerHttpRequestHandler.class );

private final String wsURI;
private File indexHTML;

public EchoServerHttpRequestHandler(String wsURI) throws URISyntaxException {
 this.wsURI = wsURI;  
 String path = null;
 URL url = EchoServerHttpRequestHandler.class.getResource("/index.html");
 path = url.getPath();
 indexHTML = new File(path);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) 
 throws Exception {
 
if( wsURI.equalsIgnoreCase(request.getUri()) ) {
 
 ctx.fireChannelRead(request.retain());

} else {
 
 if( HttpHeaders.is100ContinueExpected(request) ) {
  send100Continue(ctx);
 }
 
 try (
  RandomAccessFile rFile = new RandomAccessFile(indexHTML, "r")
 ) {
  HttpResponse response = 
   new DefaultHttpResponse( request.getProtocolVersion(), 
    HttpResponseStatus.OK );
  response.headers().set(
   HttpHeaders.Names.CONTENT_TYPE, 
   "text/html; charset=UTF-8");
  boolean keepAlive = HttpHeaders.isKeepAlive(request);
  if( keepAlive ) {
   response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
    rFile.length());
   response.headers().set(HttpHeaders.Names.CONNECTION, 
    HttpHeaders.Values.KEEP_ALIVE);
  }
  
  ctx.write(response);
  
  if( ctx.pipeline().get(SslHandler.class) == null ) {
   ctx.write(new DefaultFileRegion(rFile.getChannel(), 0, rFile.length()));
  } else {
   ctx.write(new ChunkedNioFile(rFile.getChannel()));
  }
  
  ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
  
  if( !keepAlive ) {
   future.addListener(ChannelFutureListener.CLOSE);
  }
 }
}
}

private void send100Continue(ChannelHandlerContext ctx) {
 FullHttpResponse response = new DefaultFullHttpResponse(
  HttpVersion.HTTP_1_1, 
  HttpResponseStatus.CONTINUE);
 ctx.writeAndFlush( response );
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
 throws Exception {
 logger.error("error processing http", cause);
 ctx.close();
}
}

Code - WebSocket Handler


The WebSocket handler receives a TextWebSocketFrame, converts the text contents to upper case, then echoes back the uppercase result using another TextWebSocketFrame.

public class EchoServerWSHandler 
  extends SimpleChannelInboundHandler&ltTextWebSocketFrame> {

  private Logger logger = LoggerFactory.getLogger( EchoServerWSHandler.class );

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { 
    String text = msg.text().toUpperCase();
    TextWebSocketFrame outFrame = new TextWebSocketFrame(true, 0, text);
    ctx.channel().writeAndFlush(outFrame);  // send back a transformed frame
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.error("error processing websocket frame", cause);
    ctx.close();
  }
}

Code - JavaFX Client

I'm going to skip over the trivial Application subclass, EchoClientWS.  You can see the complete code on GitHub.

EchoClientWS creates a single Stage and Scene containing the objects defined in EchoClientWS.fxml.  EchoClientWS.fxml uses EchoClientWSController as its JavaFX Controller.
In a JavaFX Controller, you use @FXML annotations to link the source code with the FXML file.  I create FXML files using either the Oracle or Gluon SceneBuilder apps.  I use JavaFX binding to control the UI.   The enabled state of the UI controls are bound using JavaFX Binding to a program flag "connected".  When connected changes in response to the Netty interface, all the UI controls will update accordingly.

This is the set of fields in the Controller class and the @FXML initialize method.

@FXML
TextField tfSend;

@FXML
TextField tfReceive;

@FXML
TextField tfHost;

@FXML
TextField tfPort;

@FXML
Button btnConnect;

@FXML
Button btnSend;

@FXML
Button btnDisconnect;

@FXML
HBox hboxStatus;

@FXML
ProgressIndicator piStatus;

@FXML
Label lblStatus;

private BooleanProperty connected = new SimpleBooleanProperty(false);
private StringProperty receivingMessageModel = new SimpleStringProperty("");
private Channel channel;
private EventLoopGroup group;

@FXML
public void initialize() {
 
 hboxStatus.setVisible(false);
 
 btnConnect.disableProperty().bind( connected );
 tfHost.disableProperty().bind( connected );
 tfPort.disableProperty().bind( connected );
 tfSend.disableProperty().bind( connected.not() );
 btnDisconnect.disableProperty().bind( connected.not() );
 btnSend.disableProperty().bind( connected.not() );

 tfReceive.textProperty().bind(receivingMessageModel);
}

Code - Client Connect


This code bootstraps and connects the client.  A Netty Pipeline is setup that is a mixture of HTTP-handling and WebSocket-handling objects.  After the handshake is completed, there will be only WebSocket-handling objects.

@FXML
public void connect() throws URISyntaxException {
 
 if( connected.get() ) {
  if( logger.isWarnEnabled() ) {
   logger.warn("client already connected; skipping connect");
  }
  return;  // already connected; should be prevented with disabled
 }
 
 String host = tfHost.getText();
 int port = Integer.parseInt(tfPort.getText());

 group = new NioEventLoopGroup();
 
 final WebSocketClientProtocolHandler handler =
  new WebSocketClientProtocolHandler(
   WebSocketClientHandshakerFactory.newHandshaker(
     new URI("ws://" + host + "/ws"), 
     WebSocketVersion.V13, null, false, new DefaultHttpHeaders()));
   
 Task<Channel> task = new Task<Channel>() {

  @Override
  protected Channel call() throws Exception {
   
   updateMessage("Bootstrapping");
   updateProgress(0.1d, 1.0d);
   
   Bootstrap b = new Bootstrap();
   b
    .group(group)
    .channel(NioSocketChannel.class)
    .remoteAddress( new InetSocketAddress(host, port) )
    .handler( new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    p.addLast(new HttpClientCodec());
    p.addLast(new HttpObjectAggregator(8192));
    p.addLast(handler);
    p.addLast(new EchoClientHandlerWS(receivingMessageModel));
    }
    });
   
   updateMessage("Connecting");
   updateProgress(0.2d, 1.0d);

   ChannelFuture f = b.connect();    
   f.sync();
   Channel chn = f.channel();

   return chn;
  }

  @Override
  protected void succeeded() {
   
   channel = getValue();
   connected.set(true);
  }

  @Override
  protected void failed() {
   
   Throwable exc = getException();
   logger.error( "client connect error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   
   connected.set(false);
  }
 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 
 new Thread(task).start();
}

Note the use of the JavaFX Task.  This is the mechanism that I'm using to handle the asynchronous operations.  I've made all my Netty calls synchronous (the sync() method), but they will be run on a different Thread as provided by JavaFX.  Doubling up on asynchronous calls would make the program much more complex.

This Task has a return type of Channel.  When the Task completes, the class variable "channel" will be set to the result of the Netty connection.  The flag "connected" will also be set.  When "connected" changes, recall the binding that will adjust the UI controls.

There are two custom handlers used in the connect() call.  One is from anonymous inner class "handler" which sets up a WebSocket handshaker object.  The other, EchoClientHandlerWS, is the business logic for the client side.

Code - Client Disconnect


Before getting into the send() operation, I'll present the disconnect function which destroys the objects set up in the previous section.

@FXML
public void disconnect() {
 
 if( !connected.get() ) {
  if( logger.isWarnEnabled() ) {
   logger.warn("client not connected; skipping disconnect");
  }
  return;
 }
 
 Task<Void> task = new Task<Void>() {

  @Override
  protected Void call() throws Exception {
   
   updateMessage("Disconnecting");
   updateProgress(0.1d, 1.0d);
   
   channel.close().sync();     

   updateMessage("Closing group");
   updateProgress(0.5d, 1.0d);
   group.shutdownGracefully().sync();

   return null;
  }

  @Override
  protected void succeeded() {
   
   connected.set(false);
  }

  @Override
  protected void failed() {
   
   connected.set(false);

   Throwable t = getException();
   logger.error( "client disconnect error", t );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( t.getClass().getName() );
   alert.setContentText( t.getMessage() );
   alert.showAndWait();

  }
  
 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());

 new Thread(task).start();
}

In the JavaFX Tasks for the connect() and disconnect() methods, I'm using JavaFX binding to toggle the state of some status controls.  A section of the UI will be displayed while the Task is running which contains a ProgressBar and Label: piStatus, lblStatus.

When using JavaFX Tasks, it's important to segregate your FX-changing code with your service calls.  Code that updates the UI should be relegated to the succeeded() or failed() methods.  Exceptions include the updateMessage() and updateProgress() calls which will be executed on the FX Thread.

You can call Platform.runLater() in the middle of the call() method, but this can lead to undesired results.  Sometimes, the UI isn't updated until after the call() method which obviates the need to put it in the call() method in the first place.  Also, you can get some blocking if you're trying to have the call() method do work yet update the UI as you are going along.

Code - Client Send


The send() call forms a TextWebSocketFrame and calls writeAndFlush() on the Channel.  This is also using a JavaFX Task and the Netty call is executed synchronously.

@FXML
public void send() {
 
 if( !connected.get() ) {
  if( logger.isWarnEnabled() ) {
   logger.warn("client not connected; skipping write");
  }
  return;
 }
 
 final String toSend = tfSend.getText();
 
 Task<Void> task = new Task<Void>() {

  @Override
  protected Void call() throws Exception {
   
   TextWebSocketFrame frame = new TextWebSocketFrame(
    true, 0, Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8));
   ChannelFuture f = channel.writeAndFlush( frame);
   f.sync();

   return null;
  }
  
  @Override
  protected void failed() {
   
   Throwable exc = getException();
   logger.error( "client send error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   
   connected.set(false);
  }

 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 
 new Thread(task).start();
}

Code - Client WS Handler


Finally, this is the code for the custom WebSocket Netty handler used by the client.  The channelRead0() method is invoked with a TextWebSocketFrame is sent from the server.  A JavaFX Property is updated and this will result in the UI being updated.

@Sharable
public class EchoClientHandlerWS extends SimpleChannelInboundHandler {

 private Logger logger = LoggerFactory.getLogger( EchoClientHandlerWS.class );

 private final StringProperty receivingMessageModel;
 
 public EchoClientHandlerWS(StringProperty receivingMessageModel) {
  this.receivingMessageModel = receivingMessageModel;
 }
 
 @Override
 protected void channelRead0(ChannelHandlerContext arg0, TextWebSocketFrame in) throws Exception {  
  final String cm = in.text();
  Platform.runLater( () -> receivingMessageModel.set(cm) );
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo client", cause );
  ctx.close();
 } 
}

Notice the use of the Platform.runLater() method.  Because this will indirectly result in a UI update, I'm making sure that the set(cm) call will execute on the FX Thread.

Wrap Up


To learn more about this example, you can find the source on GitHub.  Simply import in the IDE of your choice (I've tested IntelliJ and Eclipse), run the EchoServerWS program than the EchoClientWS program.

WebSockets address a major shortcoming in protocols like HTTP and RMI which is the ability for the server to initiate conversations with clients.  While there are creative solutions (Ajax, polling) that make HTTP usuable in RIAs, a true bi-directional solution is vastly more preferred for resource management and performance.  WebSockets are an IETF standard, are available in most of the current browser, and can be integrated in JavaFX applications with the Netty framework.  This makes WebSockets a great tool to keep in your programmer's toolbox.


No comments:

Post a Comment