Featured Post

Applying Email Validation to a JavaFX TextField Using Binding

This example uses the same controller as in a previous post but adds a use case to support email validation.  A Commons Validator object is ...

Sunday, September 6, 2015

Synchronizing the Asynchronous in Core Java

There are about a half a dozen posts on this blog advocating for adding asynchronous behavior to JavaFX apps to make the apps appear lively and responsive.  This post is the opposite.  It's about wrapping up an asynchronous operation into a synchronous API.  Although with Java 8 it's never been more convenient to pass around blocks of code as callbacks, sometimes, you just want to wait and get a result.

This post was born out of an interface with a web application.  Using an embedded browser, calls are made to my JavaFX app from JavaScript.  To keep my JavaFX app responsive, I make broad use of the JavaFX Task class.  However, the interfacing web app is doing the same thing.  Were I to make the whole project asynchronous -- passing around a callback for every operation -- I'd quickly lose readability and have to add extra code in the callbacks to give them the appropriate context.

My solution continues to rely on the JavaFX Task class, but I'm wrapping up (some) of the calls to the web application using the BlockingQueue technique presented in the post.  That way, I can continue to structure my Task code using call/succeeded/cancelled/failed and don't have to worry about loading up my call with a complicated Lambda.

All code is available as a Maven project on GitHub.

AsyncDAO

This first class is my asynchronous Data Access Object (DAO) which simulates long-running operation.  The fetchData() call returns immediately.  I allow a pair of callback parameters and do not provide a return value.  When data becomes available, the readyCallback is invoked with the data that's retrieved.  If there is an error, the failedCallback is called with an error message as an argument.  I'm using an AtomicInteger as part of the test data rather than a plain int in case I have multiple AsyncDAO objects called from multiple threads (this is not part of the post.)


public class AsyncDAO {

    private static AtomicInteger counter = new AtomicInteger(0);

    public void fetchData(Consumer<String> readyCallback, Consumer<String> failedCallback) {
        new Thread() {
            public void run() {
                try {
                    Thread.sleep(2000);
                    String data = "here is the data " + counter.addAndGet(1);  // if a real impl., catch exception an call failed
                    readyCallback.accept(data);
                } catch(InterruptedException ignore) {
                    failedCallback.accept("fetch was interrupted");
                }
            }
        }.start();
    }
}

Busy Loop

Before I present the BlockingQueue solution, I'd like to present an approach that does not involve the BlockingQueue.  This example waits for the result of the asynchronous operation using a busy loop.  As fast as the processor allows, the code will check the pair of booleans dataAvailable and wasError to see if either has changed.  A change in these values indicates that the code can transition out of the busy loop and report

Here is the Busy Loop code.

public class SyncDAOWrapperBusy {

    volatile String data = "";
    volatile String errorMessage = "";
    volatile boolean dataAvailable = false;
    volatile boolean wasError = false;

    AsyncDAO ad = new AsyncDAO();

    public String fetchData() throws SyncDAOException {

        ad.fetchData(
                (d) -> dataReady(d),
                (e) -> error(e)
        );

        while( !dataAvailable && !wasError ) {}

        if( wasError ) {
            String savedMessage = errorMessage;
            reset();
            throw new SyncDAOException( savedMessage );
        }

        String savedData = data;
        reset();

        return savedData;
    }

    private void reset() {
        data = "";
        errorMessage = "";
        dataAvailable = false;
        wasError = false;
    }

    private void dataReady(String d) {
        data = d;
        dataAvailable = true;
    }

    private void error(String message) {
        errorMessage = message;
        wasError = true;
    }
}

The Busy Loop code issues the fetchData() call then proceeds into a loop that waits for one of the two flags to be set by the asynchronous operation.  When one of the flags is set (dataAvailable or wasError), the method proceeds to handle either the error case (by throwing an Exception) or the success case (returning the data).

As an aside, the volatile keyword appears here because the compiler optimized my infinite loop into a true infinite loop (go figure).

Functionally, the code works as expected.  However, look at this screenshot from VisualVM.

A Busy Loop Hogs the Processor
After an initial spike, the program keeps the CPU running at 25%.  This is despite no real work being done.  The processor is frantically checking the status of my variable.  Obviously, this won't scale to many more instances of the Busy Loop and doesn't scale to an extremely long running process.

Blocking Queues


Before I present a BlockingQueue approach, let's look at the bottom line.  Here is a similar VisualVM screenshot, but notice that the CPU is barely being used.  As the program runs longer, the CPU usage gets very close to 0%.

BlockingQueues Don't Tie Up the Processor
The difference is that my BlockingQueue implementation relies on the JVM to provide an interrupt to my program.  I'm still working with the member variables to communicate between my asynchronous operation and the calling wrapper, but I don't have a loop that's spinning on flags.  My fetchData() call stops at the BlockingQueue.take() method after calling the asynch operation (a new Thread).  When the Thread finishes -- either as a success or failure -- a BlockingQueue.put() is called from the passed-in callback.

Here is the code for the BlockingQueue implementation.

public class SyncDAOWrapper {

    String data = "";
    String errorMessage = "";
    boolean wasError = false;

    BlockingQueue blockingQueue = new LinkedBlockingQueue<>(1);

    AsyncDAO ad = new AsyncDAO();

    public String fetchData() throws SyncDAOException {

        ad.fetchData(
            (d) -> dataReady(d),
            (e) -> error(e)
        );

        try {
            blockingQueue.take();
        } catch(InterruptedException ignore) {}

        if( wasError ) {
            String savedMessage = errorMessage;
            reset();
            throw new SyncDAOException( savedMessage );
        }

        String savedData = data;
        reset();

        return savedData;
    }

    private synchronized void reset() {
        data = "";
        errorMessage = "";
        wasError = false;
    }

    private synchronized void dataReady(String d) {
        data = d;
        try {
            blockingQueue.put(data);
        } catch(InterruptedException ignore) {}
    }

    private synchronized void error(String message) {
        errorMessage = message;
        wasError = true;
        try {
            blockingQueue.put(message);
        } catch(InterruptedException ignore) {}
    }
}

Wait / Notify

Because the BlockingQueue implementation is built on a single item, it's a special case.  Another implementation can be used with the wait / notify keywords.  This sets up a lock on the current object (wait(30000)) which is relinquished by the notify() calls.  I like this approach because of the timeour.  I can timeout my BlockingQueue implementation by swapping take() for poll() too.


public class SyncDAOWrapperWaitNotify {

    String data = "";
    String errorMessage = "";
    boolean dataAvailable = false;
    boolean wasError = false;

    AsyncDAO ad = new AsyncDAO();

    public String fetchData() throws SyncDAOException {

        ad.fetchData(
                    (d) -> dataReady(d),
                    (e) -> error(e)
        );

        try {
            synchronized(this) {
                wait(30000);
           }
        } catch(InterruptedException ignore) {}

        if( wasError ) {
            String savedMessage = errorMessage;
            reset();
            throw new SyncDAOException( savedMessage );
        }

        String savedData = data;
        reset();

        return savedData;
    }

    private synchronized void reset() {
        data = "";
        errorMessage = "";
        dataAvailable = false;
        wasError = false;
    }

    private synchronized void dataReady(String d) {
        data = d;
        dataAvailable = true;
        notify();
    }

    private synchronized void error(String message) {
        errorMessage = message;
        wasError = true;
        notify();
    }
}
Notice that I'm using the same AsyncDAO class for all of the operations.  Recall the reason I programmed this was to wrap up another partner's call.

Finally, this is the main that I've been using for all of the implementations.

public class SyncDAOMain {

    public static void main(String[] args) {

        System.out.println("****** blocking queue");
        SyncDAOWrapper dao = new SyncDAOWrapper();
        try {
            for( int i=0; i<10; i++ ) {
                String data = dao.fetchData();
                System.out.println("data retrieved=" + data);
            }
        } catch(SyncDAOException exc) {
            exc.printStackTrace();
        }

        System.out.println("****** busy loop");
        AsyncDAO.reset();

        SyncDAOWrapperBusy daoBusy = new SyncDAOWrapperBusy();
        try {
            for( int i=0; i<10; i++ ) {
                String data = daoBusy.fetchData();
                System.out.println("data retrieved=" + data);
            }
        } catch(SyncDAOException exc) {
            exc.printStackTrace();
        }

        System.out.println("****** wait/notify");
        AsyncDAO.reset();

        SyncDAOWrapperWaitNotify daoWaitNotify = new SyncDAOWrapperWaitNotify();
        try {
            for( int i=0; i<10; i++ ) {
                String data = daoWaitNotify.fetchData();
                System.out.println("data retrieved=" + data);
            }
        } catch(SyncDAOException exc) {
            exc.printStackTrace();
        }

    }
}

All code is available as a Maven project on GitHub.

If you're a JavaFX developer, use the Task class to keep the UI responsive during long-running operations.  However, once you have the bottleneck of the FX thread resolved, you may want to deal with synchronous operations within the Task.  The BlockingQueue and Wait / Notify code examples are the basis for an integration I did with a webapp.  It became inconvenient to pass around snippets of code and their context even with the syntactic sugar of Java 8 Lambdas.  While Java developers probably don't use asynchronous operations enough because of the unfamiliar libraries, they aren't a one-size-fits-all proposition especially if readability suffers.

EPILOGUE - Sept. 7, 2015

I went with the wait / notify approach since I anticipate only one object needing to block at a time.'

No comments:

Post a Comment