Featured Post

Applying Email Validation to a JavaFX TextField Using Binding

This example uses the same controller as in a previous post but adds a use case to support email validation.  A Commons Validator object is ...

Friday, March 4, 2016

A Symphony of Asynchrony : JavaFX Tasks and Netty Sockets

This blog post shows how to integrate the Netty client/server framework into a JavaFX application.  While you can build a distributed app using simple request/response protocols like HTTP or RMI, these protocols are often inefficient and deficient for apps requiring steady server-side updates, push notifications, and long-running operations.  Netty uses an efficient network implementation based on asynchronous processing and a state-based connection to avoid hacks like polling to refresh your client-side code.

When integrating Netty with JavaFX, you need to make sure that your UI interaction is done on the FX thread without locking the UI.  This means wrapping up your Netty calls in the FX Task class.  The FX Task provides a Thread for the long-running operation and in most cases, you can allow Netty to wait() for responses using the sync() call which will block but not cause the app to freeze.


This example is based on an echo client and server program found in "Netty in Action" by Norman Maurer and Marvin Allen Wolfthal.  After a connection is established, a client gathers a java.lang.String and sends it to the server.  The server transforms the String using toUpperCase() and sends the transformation back to the client.  The client displays the String in the UI.

All of the code in this post is found on GitHub.

This video shows the client running on a Mac and talking with the server which is running on the same Mac.  The consoles in the lower part of the screen show the server and the client talking as operations are executed from the UI.


This video shows the same program running multiple clients on Windows talking with the server.  Notice the brief glimpse of a progress bar when the Connect and Send Buttons are pressed.  Notice the more substantial showing when the Disconnect Button is pressed.


Design

I've packed both the server and the client code in the same Maven project for convenience.  The following UML class diagram shows the classes involved in the program.

FX Echo Client Class Diagram
EchoServer and EchoClient contain main() functions that are the entry points for the server and client processes.  EchoServer contains Netty code for bootstrapping, binding, and setting up the Pipeline with the custom EchoServerHandler.  EchoClient creates the UI object EchoClientController which contains Netty code for connecting, disconnecting, sending, and receiving.  EchoClientController also sets up the client Pipeline with the EchoClientHandler.

This diagram shows the connect/send/receive/disconnect sequence.  It's not normalized, so some of the operations ("Enter Text", "Netty Connect") are notional and not found in the code.  Much of the program's data exchange is implemented using off-the-shelf JavaFX binding and Netty Futures.

FX Echo Client Class Diagram
The sequence diagram is summarized as follows.
  1. The user presses the Connect Button. 
  2. EchoClientController bootstraps and connects to the EchoServer.
  3. The user enters text and presses the Send Button.  
  4. A writeAndFlush() operation is called on the Channel.  EchoServerHandler's channelRead() and channelReadComplete() methods are called.
  5. The EchoServerHandler's channelRead() method executes its own write() method and the channelReadComplete() method executes a flush().
  6. EchoClientHandler receives the data. 
  7. EchoClientHandler  sets a StringProperty which is bound to the UI.  Automatically, the TextField in the UI is updated.
  8. The user presses the Disconnect Button.  
  9. EchoClientController closes the Channel and shuts down the EventGroup (not pictured).

Client Code


All of the code is on GitHub, so I'm going to focus this post on the client-side JavaFX / Netty interaction.  I'm leaving out the trivial EchoClient JavaFX Application subclass which creates a Stage and loads the EchoClient.fxml file.  The significant client code is in the EchoClientController class.

connect()



The connect() takes a host and port from the UI and creates Netty channel which is saved as a field of EchoClientController.

From EchoClientController.java

@FXML
HBox hboxStatus;
 
@FXML
ProgressIndicator piStatus;
 
@FXML
Label lblStatus;
 
private BooleanProperty connected = new SimpleBooleanProperty(false);
private StringProperty receivingMessageModel = new SimpleStringProperty("");
private Channel channel;

@FXML
public void connect() {
 
 if( connected.get() ) {
  return;  // already connected; should be prevented with disabled
 }
 
 String host = tfHost.getText();
 int port = Integer.parseInt(tfPort.getText());

 group = new NioEventLoopGroup();
 
 Task<Channel> task = new Task<Channel>() {
  @Override
  protected Channel call() throws Exception {
   
   updateMessage("Bootstrapping");
   updateProgress(0.1d, 1.0d);
   
   Bootstrap b = new Bootstrap();
   b
   .group(group)
   .channel(NioSocketChannel.class)
   .remoteAddress( new InetSocketAddress(host, port) )
   .handler( new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
     ch.pipeline().addLast(new EchoClientHandler(receivingMessageModel));
    }
   });
   
   ChannelFuture f = b.connect();
   Channel chn = f.channel();
   
   updateMessage("Connecting");
   updateProgress(0.2d, 1.0d);

   f.sync();

   return chn;
  }

  @Override
  protected void succeeded() {
   
   channel = getValue();
   connected.set(true);
  }

  @Override
  protected void failed() {
   
   Throwable exc = getException();
   logger.error( "client connect error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   
   connected.set(false);
  }
 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 
 new Thread(task).start();
}

The Netty calls to bootstrap and connect are wrapped up in a JavaFX Task.  Task is key in JavaFX programming and as a rule-of-thumb I put anything in a Task that has the potential to take longer than 1 second.  This means that I put pretty much everything into a Task except for RAM-based manipulation of Java objects.

The Task exposes several properties: runningProperty, messageProperty, progressProperty.  I bind these to UI elements: a container HBox, a Label, a ProgressIndicator.  JavaFX binding obviates the need to register listeners and to call setter() methods on the UI controls.

The call() method returns a Channel.  In this implementation, I don't care about Netty's asyncronous behavior -- I'm already running on a new Thread() -- so I can wait until its sync() call returns.  The returned Channel value is set to a field in the succeeded() method.  If a Netty exception is thrown, the failed() method is invoked, a message is logged and displayed to the user in a Dialog.

succeeded(), failed(), updateMessage(), and updateProgress() are executed on the FX Thread while call() is not.  call() should not update the UI in any way.  call() should work exclusively with the time-consuming Netty operation.

send()


The send() method uses the saved-off Channel object to issue a writeAndFlush() call.  This writeAndFlush() will be run through the EchoClientHandler delegate via the Netty framework.

Also from EchoClientController.java

@FXML
public void send() {

 if( !connected.get() ) {
  return;
 }
 
 final String toSend = tfSend.getText();
 
 Task task = new Task() {

  @Override
  protected Void call() throws Exception {
   
   ChannelFuture f = channel.writeAndFlush( Unpooled.copiedBuffer(toSend, CharsetUtil.UTF_8) );
   f.sync();

   return null;
  }
  
  @Override
  protected void failed() {
   
   Throwable exc = getException();
   logger.error( "client send error", exc );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( exc.getClass().getName() );
   alert.setContentText( exc.getMessage() );
   alert.showAndWait();
   
   connected.set(false);
  }

 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());
 
 new Thread(task).start();
}

Notice the similar pattern to connect().  The newly-created Task binds to the same three progress objects.  There is no succeeded() method and failed() contains the same logic as the connect() implementation's error handler.

This Task doesn't return anything (Void return type).  It's optimistic that the call went off and expecting an error if it didn't.  Because the call() method is already in a new Thread, I can afford to wait in the sync() method.

disconnect()


The disconnect() uses the same Task pattern as the previous two methods.  The other two methods used a single updateMessage/Progress pair.  This method has two distinct stages of wrapping up a connection with Netty.  The sync() on the close() will take a little while.  The shutdownGracefully() takes substantially longer.  However, as you may recall from the videos, at no point did the UI become unresponsive.

@FXML
public void disconnect() {
 
 if( !connected.get() ) {
  return;
 }
 
 Task<Voidgt; task = new Task<Void>() {

  @Override
  protected Void call() throws Exception {
   
   updateMessage("Disconnecting");
   updateProgress(0.1d, 1.0d);
   
   channel.close().sync();     

   updateMessage("Closing group");
   updateProgress(0.5d, 1.0d);
   group.shutdownGracefully().sync();

   return null;
  }

  @Override
  protected void succeeded() {
   
   connected.set(false);
  }

  @Override
  protected void failed() {
   
   connected.set(false);

   Throwable t = getException();
   logger.error( "client disconnect error", t );
   Alert alert = new Alert(AlertType.ERROR);
   alert.setTitle("Client");
   alert.setHeaderText( t.getClass().getName() );
   alert.setContentText( t.getMessage() );
   alert.showAndWait();

  }
  
 };
 
 hboxStatus.visibleProperty().bind( task.runningProperty() );
 lblStatus.textProperty().bind( task.messageProperty() );
 piStatus.progressProperty().bind(task.progressProperty());

 new Thread(task).start();
}

Reading


Reading from the server is brokered through the EchoClientHandler object.  This object was created with a reference to a StringProperty which is a model element that the UI also binds to.  I could have passed UI elements into the handler directly, however this violates the separation of concerns and makes it more difficult to apply this notification to more than one view.  That is, the StringProperty can be bound to any number of UI elements and an update from the handler will update all the UI elements.

This is the code for EchoClientHandler.java.  Note the FX Thread protection in the channelRead0() method.

@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler {

 private Logger logger = LoggerFactory.getLogger( EchoClientHandler.class );

 private final StringProperty receivingMessageModel;
 
 public EchoClientHandler(StringProperty receivingMessageModel) {
  this.receivingMessageModel = receivingMessageModel;
 }
 
 @Override
 protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) 
              throws Exception {
  final String cm = in.toString(CharsetUtil.UTF_8);
  Platform.runLater( () -> receivingMessageModel.set(cm) );
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo client", cause );
  ctx.close();
 } 
}

One final note on the binding sequence...We don't know when channelRead0() will be called (in this case we are relying on Netty's asynchrony), but when it is called, we set the model object.  I wrap up the model object update with some FX Thread protection.  FX -- as a binding framework -- will update any UI elements such as the TextField .

Client Code Wrap Up


The key to integration Netty with JavaFX is to use Tasks.  Tasks keep the UI responsive and allow for visual feedback through their exposed properties.  Tasks make some of Netty's asynchronous processing unneeded (at least at the application level), so the Tasks can freely block without fear of locking the UI.  When being notified of new data, consider using JavaFX binding brokered through a dedicated model object to update the UI instead of making direct calls to specific objects.

Server Code

I'm going to present the server code without comment here as this post is on client-side netty.  It's close to the example in the Manning book.

From EchoServer.java

public class EchoServer {

 private final int port;
 
 public EchoServer(int port) {
  this.port = port;
 }
 
 public static void main(String[] args) throws Exception {
  if( args.length != 1 ) {
   System.err.println("usage: java EchoServer port");
   System.exit(1);
  }
  
  int port = Integer.parseInt(args[0]);
  new EchoServer(port).start();
 }
 
 public void start() throws Exception {
  
  final EchoServerHandler echoServerHandler = new EchoServerHandler();
  
  EventLoopGroup group = new NioEventLoopGroup();
  
  try {
   ServerBootstrap b = new ServerBootstrap();
   b
    .group(group)
    .channel(NioServerSocketChannel.class)
    .localAddress(new InetSocketAddress(port))
    .childHandler(new ChannelInitializer() {
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast( echoServerHandler );
     }     
    });
    
   ChannelFuture f = b.bind().sync();
   
   f.channel().closeFuture().sync();
   
  } finally {
   group.shutdownGracefully().sync();
  }
 }
}

From EchoServerHandler.java

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

 private Logger logger = LoggerFactory.getLogger( EchoServerHandler.class );
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  
  ByteBuf in = (ByteBuf)msg;
  String in_s = in.toString(CharsetUtil.UTF_8);
  String uc = in_s.toUpperCase();
  if( logger.isInfoEnabled() ) {
   logger.info("[READ] read " + in_s + ", writing " + uc);
  }
  in.setBytes(0,  uc.getBytes(CharsetUtil.UTF_8));
  ctx.write(in);  // writes bytes back to sender (no flush)
 }

 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  if( logger.isDebugEnabled() ) {
   logger.debug("[READ COMPLETE]");
  }
  ctx.flush();
 }
 
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  super.channelActive(ctx);
  if(logger.isDebugEnabled() ) {
   logger.debug("[CHANNEL ACTIVE]");
  }
  ctx.channel().closeFuture().addListener(f -> logger.debug("[CLOSE]"));
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  logger.error( "error in echo server", cause);
  ctx.close();
 }
}

Even though today's fast computers can expend extra CPU cycles on polling, an efficient network layer will make your application more responsive and dynamic and will also keep the server from doing extra work.

2 comments:

  1. Good day Sir!
    I hope you can give pointers on the Server having its own Javafx UI. Meaning, incoming messages from clients will be displayed on a JavaFX control, like, TextArea.

    Answering to this post will be much appreciated.

    Rey

    ReplyDelete
    Replies
    1. I don't usually run GUIs on servers. Maybe you're looking for a client that can tap into logging and events on a server?

      Delete