Thursday, October 16, 2014

TCP Load Balancing with HAProxy

HAProxy is a free, very fast and reliable solution offering high availability, load balancing, and proxying for TCP and HTTP-based applications. It is one of the easiest load balancers to configure when it comes to TCP load balancing.

In this post, we will look at an example HAProxy config, and example Java code based on the standard "Knock knock server-client" code. We are going to run two TCP servers, which will be fronted by an HAProxy instance, and distribute the load in a round-robin fashion. The TCP servers will be running on localhost ports 4444 & 5555 respectively.

Here's the game plan

HAProxy Configuration

Once you have installed HAProxy, you need to create a config file with the following content.

frontend localnodes
    bind *:8080
    mode tcp
    default_backend nodes
    timeout client          1m

backend nodes
    mode tcp
    balance roundrobin
    server web01
    server web02
    timeout connect        10s
    timeout server          1m

Note that the default_backend entry in the frontend section of the configuration points to the backend section named nodes.  HAProxy will be proxying traffic on the 8080 port. Our sample knock-knock client will connect to this 8080 port. The rest of the configuration is self explanatory.

Once you have the configuration file created, start HAProxy from its home directory as follows:

sbin/haproxy -f haproxy.conf

If everything is running fine, you should not see any errors or warnings when you run the above command.

Java Code

Now let us look at the relevant Java code. I am using the same Knock knock example from the Oracle Java SE documentation, with slight modification which enable it to serve concurrent clients, and  print a message when a client connects, in order to demonstrate TCP load balancing in action.



public class Server {
    public static void main(String[] args) throws IOException {

        if (args.length != 1) {
            System.err.println("Usage: java KnockKnockServer <port number>");

        int portNumber = Integer.parseInt(args[0]);
        ServerSocket serverSocket = new ServerSocket(portNumber);
        while (true) {
            Socket clientSocket = serverSocket.accept();
            serveRequest(clientSocket, portNumber);

    private static void serveRequest(final Socket clientSocket, final int portNumber) throws IOException {
        Runnable runnable = new Runnable() {
            public void run() {
                try {
                    PrintWriter out =
                            new PrintWriter(clientSocket.getOutputStream(), true);
                    BufferedReader in = new BufferedReader(
                            new InputStreamReader(clientSocket.getInputStream()));
                    String inputLine, outputLine;
                    System.out.println("Got message...");

                    // Initiate conversation with client
                    KnockKnockProtocol kkp = new KnockKnockProtocol();
                    outputLine = kkp.processInput(null);

                    while ((inputLine = in.readLine()) != null) {
                        outputLine = kkp.processInput(inputLine);
                        if (outputLine.equals("Bye."))
                } catch (IOException e) {
                    System.out.println("Exception caught when trying to listen on port "
                            + portNumber + " or listening for a connection");
        new Thread(runnable).start();



public class Client {
    public static void main(String[] args) throws IOException {

        if (args.length != 2) {
                    "Usage: java EchoClient  ");

        String hostName = args[0];
        int portNumber = Integer.parseInt(args[1]);

        try {
            Socket kkSocket = new Socket(hostName, portNumber);
            PrintWriter out = new PrintWriter(kkSocket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(
                    new InputStreamReader(kkSocket.getInputStream()));
            BufferedReader stdIn =
                    new BufferedReader(new InputStreamReader(;
            String fromServer;
            String fromUser;

            while ((fromServer = in.readLine()) != null) {
                System.out.println("Server: " + fromServer);
                if (fromServer.equals("Bye."))

                fromUser = stdIn.readLine();
                if (fromUser != null) {
                    System.out.println("Client: " + fromUser);
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host " + hostName);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for the connection to " +


public class KnockKnockProtocol {
    private static final int WAITING = 0;
    private static final int SENTKNOCKKNOCK = 1;
    private static final int SENTCLUE = 2;
    private static final int ANOTHER = 3;

    private static final int NUMJOKES = 5;

    private int state = WAITING;
    private int currentJoke = 0;

    private String[] clues = { "Turnip", "Little Old Lady", "Atch", "Who", "Who" };
    private String[] answers = { "Turnip the heat, it's cold in here!",
            "I didn't know you could yodel!",
            "Bless you!",
            "Is there an owl in here?",
            "Is there an echo in here?" };

    public String processInput(String theInput) {
        String theOutput = null;

        if (state == WAITING) {
            theOutput = "Knock! Knock!";
            state = SENTKNOCKKNOCK;
        } else if (state == SENTKNOCKKNOCK) {
            if (theInput.equalsIgnoreCase("Who's there?")) {
                theOutput = clues[currentJoke];
                state = SENTCLUE;
            } else {
                theOutput = "You're supposed to say \"Who's there?\"! " +
                        "Try again. Knock! Knock!";
        } else if (state == SENTCLUE) {
            if (theInput.equalsIgnoreCase(clues[currentJoke] + " who?")) {
                theOutput = answers[currentJoke] + " Want another? (y/n)";
                state = ANOTHER;
            } else {
                theOutput = "You're supposed to say \"" +
                        clues[currentJoke] +
                        " who?\"" +
                        "! Try again. Knock! Knock!";
                state = SENTKNOCKKNOCK;
        } else if (state == ANOTHER) {
            if (theInput.equalsIgnoreCase("y")) {
                theOutput = "Knock! Knock!";
                if (currentJoke == (NUMJOKES - 1))
                    currentJoke = 0;
                state = SENTKNOCKKNOCK;
            } else {
                theOutput = "Bye.";
                state = WAITING;
        return theOutput;

Running the Server 

The server can be run by providing the server port as an argument to the Java program. We will start the two TCP servers as follows:

      java -cp classes Server 4444 

      java -cp classes Server 5555 

Running the client 

Start two client instances. The client can be started by providing the server IP address and port as follows:

      java -cp Client 8080

Note that we are providing the server port of the HAProxy frontend node, and not the actual port of any of the TCP servers.


Try running two different clients should get connected to the two TCP servers. If TCP load balancing is properly working, you should see the following message printed on the console of the two servers.

       Got message...

Friday, November 22, 2013

WSO2 Multi-tenant Cache: JSR-107 (JCache) implementation based on Hazelcast

With the WSO2 Carbon 4.2.0 kernel release, we have moved from Tribes to Hazelcast, and have also introduced a new JSR-107 (JCache) caching implementation.

Core Features

  1. Local & distributed mode
  2. L1 & L2 caching model for distributed mode
  3. Multi-tenancy

This cache supports both local mode & distributed mode. If (Axis2) clustering is enabled, the cache works as a distributed cache. It is simply using a Hazelcast distributed map per cache. If clustering is not enabled, then the same caching implementation works as a local cache. There is no code change required in the code that uses JCache APIs to switch from local cache mode to distributed cache mode.

In addition, in distributed cache mode, in order to improve performance, we have an L1 cache, which is simply implemented using HashMaps, and the L2 cache is implemented using Hazelcast distributed maps. So, we first check the L1 cache, and if there is a cache miss, we go to the L2 cache. If the value is located in the L2 cache, then we also store it in the L1 cache. This is illustrated in the sequence diagram below.

In distributed mode, If a cache entry is removed, invalidated or changed, then Hazelcast listeners we have registered get triggered on each & every Hazelcast cluster member. That will result in the values in the L1 cache & L2 cache being removed or updated. The following sequence diagram shows

Each cache operation is automatically tenant qualified. The tenant is derived from the thread context using the CarbonContext tenant ID & tenant domain. Caches created by a particular tenant can only be accessed by that tenant. This ensures that tenants cannot gain illegal access to caches owned by other tenants. 

Code Examples

The following examples illustrate how the JCache APIs can be used to acquire the cache & carry out caching operations.

Example 1: Simple cache creation

        CacheManager cacheManager1 =   Caching.getCacheManagerFactory().getCacheManager("sampleCacheManager");

        Cache<String, Integer> cache = cacheManager.getCache("sampleCache");

        int value1 = 9876;

        cache.put(key, value1);

        int value = cache.get(key).intValue()

Example 2: Using cache configuration with a custom cache expiry

CacheManager cacheManager = Caching.getCacheManagerFactory().getCacheManager("test");

        String cacheName = "cacheXXX";

        cache = cacheManager.<String, Integer>createCacheBuilder(cacheName).

                setExpiry(CacheConfiguration.ExpiryType.MODIFIED, new CacheConfiguration.Duration(TimeUnit.SECONDS, 10)).


        int value = 9876;

        cache.put(key, value);

        assertEquals(cache.get(key).intValue(), value);

The relevant code is available at the following SVN locations;

Saturday, July 06, 2013

Travel Dua

اللَّهُ أَكْبَرُ، اللَّهُ أَكْبَرُ، اللَّهُ أَكْبَرُ سُبْحَانَ الَّذِي سَخَّرَ لَنَا هَذَا وَمَا كُنَّا لَهُ مُقْرِنِينَ وَإِنَّا إِلَى رَبِّنَا لَمُنْقَلِبُونَ، اللَّهُمَّ إِنَّا نَسْأَلُكَ فِي سَفْرِنَا هَذَا الْبِرَّ وَالتَّقْوَى ، وَمِنَ الْعَمَلِ مَا تَرْضَى ، اللَّهُمَّ هَوَّنْ عَلَيْنَا سَفْرِنَا هَذَا وَاطْوَعَّنَّا بَعْدهُ ، اللَّهُمَّ أَنْتَ الصَّاحِبُ فِي السَّفَرِ، وَالْخَلِيفَةُ فِي الأَهْلِ، اللَّهُمَّ إِنِّي أَعُوْذُ بِكَ مِنْ وَعْثَاءِ السَّفَرِ، وَكآبَةِ الْمَنْظَرِ وَسُوءِ المُنْقَلَبِ فِي الْمَالِ وَالأَهْلِ + آيِبُونَ تَائْبُونَ عَابِدُونَ لِرَبِّنَا حَامِدُونَ

Friday, June 07, 2013

WSO2 Carbon Port Offset

The portOffset features in WSO2 Carbon allows you to run multiple WSO2 products, multiple instances of WSO2 products, or WSO2 product clusters on the same machine or VM.

The port offset defines the number by which all ports defined in the runtime such as the HTTP/S ports will be offset. e.g. if the HTTP port is defined as 9763 & portOffset is 1, the effective HTTP port will be 9764. 

The portOffset can be specified as a System property or it can be defined in the carbon.xml file

PortOffset can be passed in during server startup as follows:

./ -DportOffset=3

PortOffset can be set in the carbon.xml as follows, under the Ports section:


Wednesday, May 29, 2013

WSO2 EIP (Enterprise Integration Patterns)

WSO2 ESB provides the capability of implementing all major enterprise integration patterns. For more details, take a look at the WSO2 ESB Enterprise Integration Patterns documentation 

Currently the WSO2 EIP catalog contains the following EIPs.

Messaging Systems

How one application communicates with another using messaging.
How two applications connected by a message channel exchange a piece of information.
How to perform complex processing on a message while maintaining independence and flexibility.
How to decouple individual processing steps so that messages can be passed to different filters depending on conditions.
How systems using different data formats communicate with each other using messaging.
How an application connects to a messaging channel to send and receive messages.

Messaging Channels

How the caller can be sure that exactly one receiver will receive the document or perform the call.

How the sender broadcasts an event to all interested receivers.

How the application sends a data item such that the receiver will know how to process it.
How a messaging receiver gracefully handles a message that makes no sense.
What the messaging system does with a message it cannot deliver.
How the sender ensures delivery of a message, even if the messaging system fails.
How to connect an application to the messaging system to send/receive messages.
How multiple messaging systems can be connected so that messages available on one are also available on the others.
An architecture enabling separate applications to work together in a decoupled fashion such that applications can be easily added or removed without affecting the others.

Message Construction

How messaging can be used to invoke a procedure in another application.
How messaging can be used to transfer data between applications.
How messaging can be used to transmit events from one application to another.
How an application that sends a message gets a response from the receiver.
How a replier knows where to send the reply.
How a requester that has received a reply knows which request the reply is for.
How messaging can transmit an arbitrarily large amount of data.
How a sender indicates when a message should be considered stale and therefore should not be processed.
Format IndicatorHow a message’s data format can be designed to allow for possible future changes.

Message Routing

How to handle a situation when the implementation of a single logical function (such as an inventory check) is spread across multiple physical systems.
How a component avoids receiving uninteresting messages.
How to avoid the dependency of a router in all possible destinations, while maintaining its efficiency.
How to route a message to a list of dynamically specified recipients.
How to process a message if it contains multiple elements, each of which may have to be processed in a different way.
How to combine the results of individual but related messages so that they can be processed as a whole.
How to get a stream of related but out-of-sequence messages back into the correct order.
How to maintain the overall flow when processing a message consisting of multiple elements, each of which may require different processing.

How to maintain the overall flow when a message needs to be sent to multiple recipients, each of which may send a reply.
How to route a message consecutively through a series of steps when the sequence of the steps is not known at design time and may vary for each message.
How to route a message through multiple processing steps, when the required steps may not be known at design time and may not be sequential.
How to decouple the destination of a message from the sender and maintain central control over the flow of messages.

Message Transformation

How existing systems participate in a messaging exchange, which places specific requirements in the message format, such as message header fields or encryption.
How to communicate with another system if the message originator does not have all the required data items available.
How to simplify dealing with a large message when you are interested only in a few data items.
How to reduce the data volume of a message sent across the system without sacrificing information content.
How to process messages that are semantically equivalent but arrive in a different format.
How to minimize dependencies when integrating applications that use different data formats.

Messaging Endpoints

How to encapsulate access to the messaging system from the rest of the application.

Messaging MapperHow to move data between domain objects and the messaging infrastructure, while keeping the two independent of each other.
How a client controls its transactions with the messaging system.
How an application consumes a message when the application is ready.
How an application automatically consumes messages as they become available.
How a messaging client processes multiple messages concurrently.
How multiple consumers on a single channel coordinate their message processing.
How a message consumer selects which messages to receive.
How a subscriber avoids missing messages while it is not listening for them.

How a message receiver deals with duplicate messages.
How an application designs a service to be invoked via both messaging and non-messaging techniques.

System Management

Removes unwanted messages, which can disturb tests or running systems, from a channel.
Administers a messaging system that is distributed across multiple platforms and a wide geographic area.
Routes a message through intermediate steps to perform validation, testing or debugging functions.
Lists all applications that the message passed through since its origination.
Reports against message information without disturbing the loosely coupled and transient nature of a messaging system.
Tracks messages on a service that publishes reply messages to the Return Address specified by the requester.
Ensures the health of message processing components by preventing situations such as garbling outgoing messages due to an internal fault.
Inspects messages that travel on a Point-to-Point Channel.