JBoss remoting Callbacks

User Rating: 5 / 5

Star ActiveStar ActiveStar ActiveStar ActiveStar Active
 

In the first tutorial we've seen how a client-server application can be written using jboss remoting. Now we'll see how the client can send request asynchronously, that is without having to block and wait for the server to finish processing. At the same time we'll explore how can the client listen for events occurred on the server side.This can be achieved using Callbacks.

Within remoting, there are two approaches in which a callback can be received:

The first is to actively ask for callback messages from the remoting server, which is called a pull callback (since are pulling the callbacks from the server).
The second is to have the server send the callbacks to the client as they are generated, which is called a push callback.

Let's see one example: at first we need to let our Server generate Callbacks. For this purpose we need to add to our Server Connector a class which implements ServerInvocationHandler :

 connector = new Connector();
 connector.setInvokerLocator(locator.getLocatorURI());
 connector.create();
 SampleInvocationHandler invocationHandler = new SampleInvocationHandler();
 connector.addInvocationHandler("sample", invocationHandler);

The class SampleInvocationHandler implements also the Runnable interface so that CallBacks are generated in its run() method. Once generated, the Server scans for InvokerCallbackHandler objects which are created on the Client side and delegates the handling of the Callback to the Client. So handling a Callback will look like if the server is sending an invocation request back to the client.

public void run()
        {
            do
            {
                while(shouldGenerateCallbacks)
                {
                    Callback callback = new Callback("Callback " + Server.callbackCounter++ + ": This is the payload of callback invocation.");
                    for(Iterator itr = listeners.iterator(); itr.hasNext();)
                    {
                        InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler)itr.next();
                        try
                        {
                            callbackHandler.handleCallback(callback);
                        }
                        catch(HandleCallbackException e)
                        {
                            e.printStackTrace();
                        }
                    }
             }
           }

Moving on the client side, let's examine the pull strategy which is implemented in this method:

 public void testPullCallback()
        throws Throwable
    {
        InvokerCallbackHandler callbackHandler =
            new MyCallbackHandler();

        remotingClient.addListener(callbackHandler);
        
        Thread.sleep(2000L);
        makeInvocation();
        List callbacks = remotingClient.getCallbacks(callbackHandler);
        Callback callbackObject;
        for(Iterator itr = callbacks.iterator(); itr.hasNext(); System.out.println("Pull Callback value = " + callbackObject.getCallbackObject()))
            callbackObject = (Callback)itr.next();

        remotingClient.removeListener(callbackHandler);
    }

As you can see from this method, pull callbacks are synchronous because they require the client to periodically stop what it's doing and poll the server in order to collect the data that are queued up.  The synchronous method is the getCallBacks method which collects all callbacks from the server.

Here's the pushcallback method:

 public void testPushCallback()
        throws Throwable
    {
        String callbackLocatorURI = transport + "://" + host + ":" + (port + 1);
        InvokerLocator callbackLocator = new InvokerLocator(callbackLocatorURI);
        setupServer(callbackLocator);
        InvokerCallbackHandler callbackHandler =
            new MyCallbackHandler();

        String callbackHandleObject = "myCallbackHandleObject";
        remotingClient.addListener(callbackHandler, callbackLocator, callbackHandleObject);
        Thread.sleep(2000L);
        remotingClient.removeListener(callbackHandler);
        callbackServerConnector.stop();
        callbackServerConnector.destroy();
    }   

Here push callbacks are asynchronous because the client will be notified immediately when a request has completed processing and the response is available. As a matter of fact a listener is added to the Client after the class InvokerCallbackHandler is istantiated. When triggered from the server, the client method InvokerCallbackHandler

This is the full Server example (from JBoss remoting samples):

package test;

import java.io.PrintStream;
import java.util.*;
import javax.management.MBeanServer;
import org.jboss.remoting.*;
import org.jboss.remoting.callback.*;
import org.jboss.remoting.transport.Connector;

public class Server
{
    public static class SampleInvocationHandler
        implements ServerInvocationHandler, Runnable
    {

        public Object invoke(InvocationRequest invocation)
            throws Throwable
        {
            System.out.println("Invocation request is: " + invocation.getParameter());
            return "This is the return to SampleInvocationHandler invocation";
        }

        public void addListener(InvokerCallbackHandler callbackHandler)
        {
            System.out.println("Adding callback listener.");
            listeners.add(callbackHandler);
            shouldGenerateCallbacks = true;
        }

        public void removeListener(InvokerCallbackHandler callbackHandler)
        {
            System.out.println("Removing callback listener.");
            listeners.remove(callbackHandler);
            if(listeners.size() == 0)
                shouldGenerateCallbacks = false;
        }

        public void run()
        {
            do
            {
                while(shouldGenerateCallbacks)
                {
                    Callback callback = new Callback("Callback " + Server.callbackCounter++ + ": This is the payload of callback invocation.");
                    for(Iterator itr = listeners.iterator(); itr.hasNext();)
                    {
                        InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler)itr.next();
                        try
                        {
                            callbackHandler.handleCallback(callback);
                        }
                        catch(HandleCallbackException e)
                        {
                            e.printStackTrace();
                        }
                    }

                    try
                    {
                        Thread.currentThread();
                        Thread.sleep(1000L);
                    }
                    catch(InterruptedException e) { }
                }
                try
                {
                    Thread.currentThread();
                    Thread.sleep(1000L);
                }
                catch(InterruptedException e) { }
            } while(true);
        }

        public void setMBeanServer(MBeanServer mbeanserver)
        {
        }

        public void setInvoker(ServerInvoker serverinvoker)
        {
        }

        private List listeners;
        private boolean shouldGenerateCallbacks;

        public SampleInvocationHandler()
        {
            listeners = new ArrayList();
            shouldGenerateCallbacks = false;
            Thread callbackThread = new Thread(this);
            callbackThread.setDaemon(true);
            callbackThread.start();
        }
    }


    public Server()
    {
        connector = null;
    }

    public void setupServer(String locatorURI)
        throws Exception
    {
        InvokerLocator locator = new InvokerLocator(locatorURI);
        System.out.println("Starting remoting server with locator uri of: " + locatorURI);
        connector = new Connector();
        connector.setInvokerLocator(locator.getLocatorURI());
        connector.create();
        SampleInvocationHandler invocationHandler = new SampleInvocationHandler();
        connector.addInvocationHandler("sample", invocationHandler);
        connector.start();
    }

    public void shutdownServer()
    {
        connector.stop();
        connector.destroy();
    }

    public static void main(String args[])
    {
        if(args != null && args.length == 2)
        {
            transport = args[0];
            port = Integer.parseInt(args[1]);
        }
        String locatorURI = transport + "://" + host + ":" + port;
        Server server = new Server();
        try
        {
            server.setupServer(locatorURI);
            do
                Thread.sleep(1000L);
            while(true);
        }
        catch(Exception e)
        {
            e.printStackTrace();
        }
    }

    private static String transport = "socket";
    private static String host = "localhost";
    private static int port = 5400;
    private static int callbackCounter = 1;
    private Connector connector;
    private static final String RESPONSE_VALUE = "This is the return to SampleInvocationHandler invocation";
}

This is the full Client example (from JBoss remoting samples):

package test;

import java.io.PrintStream;
import java.util.Iterator;
import java.util.List;
import org.jboss.remoting.Client;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.InvokerCallbackHandler;
import org.jboss.remoting.transport.Connector;

public class CallBackClient
{

    class MyCallbackHandler implements InvokerCallbackHandler {
           public void handleCallback(InvocationRequest invocation) {
                  Object response = invocation.getParameter();
                  System.out.println("The server returned: " + response);
               }

        public void handleCallback(Callback arg0) throws HandleCallbackException {
             System.out.println("MyCallbackHandler Here we handle the callback");

        }



   }

    public void createRemotingClient(String locatorURI)
        throws Exception
    {
        InvokerLocator locator = new InvokerLocator(locatorURI);
        System.out.println("Calling remoting server with locator uri of: " + locatorURI);
        remotingClient = new Client(locator);
        remotingClient.connect();
    }

    public void makeInvocation()
        throws Throwable
    {
        Object response = remotingClient.invoke("Do something", null);
        System.out.println("Invocation response: " + response);
    }

    public void testPullCallback()
        throws Throwable
    {
        InvokerCallbackHandler callbackHandler =
            new MyCallbackHandler();

        remotingClient.addListener(callbackHandler);
        
        Thread.sleep(2000L);
        makeInvocation();
        List callbacks = remotingClient.getCallbacks(callbackHandler);
        Callback callbackObject;
        for(Iterator itr = callbacks.iterator(); itr.hasNext(); System.out.println("Pull Callback value = " + callbackObject.getCallbackObject()))
            callbackObject = (Callback)itr.next();

        remotingClient.removeListener(callbackHandler);
    }

    public void testPushCallback()
        throws Throwable
    {
        String callbackLocatorURI = transport + "://" + host + ":" + (port + 1);
        InvokerLocator callbackLocator = new InvokerLocator(callbackLocatorURI);
        setupServer(callbackLocator);
        InvokerCallbackHandler callbackHandler =
            new MyCallbackHandler();

        String callbackHandleObject = "myCallbackHandleObject";
        remotingClient.addListener(callbackHandler, callbackLocator, callbackHandleObject);
        Thread.sleep(2000L);
        remotingClient.removeListener(callbackHandler);
        callbackServerConnector.stop();
        callbackServerConnector.destroy();
    }

    public void setupServer(InvokerLocator locator)
        throws Exception
    {
        System.out.println("Starting remoting server with locator uri of: " + locator);
        callbackServerConnector = new Connector();
        callbackServerConnector.setInvokerLocator(locator.getLocatorURI());
        callbackServerConnector.start();
    }

    public static void main(String args[])
    {
        if(args != null && args.length == 2)
        {
            transport = args[0];
            port = Integer.parseInt(args[1]);
        }
        String locatorURI = transport + "://" + host + ":" + port;
        CallBackClient client = new CallBackClient();
        try
        {
            client.createRemotingClient(locatorURI);
       //    client.testPullCallback();
       client.testPushCallback();
        }
        catch(Throwable e)
        {
            e.printStackTrace();
        }
    }

    private static String transport = "socket";
    private static String host = "localhost";
    private static int port = 5400;
    private Client remotingClient;
    private Connector callbackServerConnector;
    public void handleCallback(Callback arg0) throws HandleCallbackException {
         System.out.println(" Here we handle the callback");

    }

}

Follow us on Twitter