Wednesday, June 24, 2015

How AWS Clustering Mode in WSO2 Products Works

In a previous blog post, I explained how to configure WSO2 product clusters to work on Amazon Web Services infrastructure. In this post I will explain how it works.

 WSO2 Clustering is based on Hazelcast.

All nodes having the same set of cluster configuration parameters will belong to the same cluster. What Hazelcast does is, it calls AWS APIs, and then gets a set of nodes that satisfy the specified parameters (region, securityGroup, tagKey, tagValue).

When the Carbon server starts up, it creates a Hazelcast cluster. At that point, it calls EC2 APIs & gets the list of potential members in the cluster. To call the EC2 APIs, it needs the AWS credentials. This is the only time these credentials are used. AWS APIs are only used on startup to learn about other potential members in the cluster. Then it tries to connect to port 5701 of those potential members, and if 5701 is unavailable, it does a port scan up to 5800. If one of those ports are available, it will do a Hazelcast handshake to make sure that those are indeed Hazelcast nodes, and will add them to the cluster if they are Hazelcast nodes.

Subsequently, the connections established between members are point to point TCP connections.  Member failures are detected through a TCP ping. So once the member discovery is done, the rest of the interactions in the cluster are same as when the multicast & WKA (Well Known Address) modes are used.

With that facility, you don't have to provide any member IP addresses or hostnames, which may be impossible on an IaaS such as EC2.

Tuesday, June 23, 2015

AWS Clustering Mode for WSO2 Products

WSO2 Clustering is based on Hazelcast. When WSO2 products are deployed in clustered mode on Amazon EC2, it is recommended to use the AWS clustering mode. As a best practice, add all nodes in a single cluster to the same AWS security group.

To enable AWS clustering mode, you simply have to edit the clustering section in the CARBON_HOME/repository/conf/axis2/axis2.xml file as follows:

Step 1: Enable clustering

<clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"

Step 2: Change membershipScheme to aws

<parameter name="membershipScheme">aws</parameter>

Step 3: Set localMemberPort to 5701

Any value between 5701 & 5800 are acceptable
<parameter name="localMemberPort">5701</parameter>

Step 4: Define AWS specific parameters

Here you need to define the AWS access key, secret key & security group. The region, tagKey & tagValue are optional & the region defaults to us-east-1

<parameter name="accessKey">xxxxxxxxxx</parameter>
<parameter name="secretKey">yyyyyyyyyy</parameter>
<parameter name="securityGroup">a_group_name</parameter>
<parameter name="region">us-east-1</parameter>
<parameter name="tagKey">a_tag_key</parameter>
<parameter name="tagValue">a_tag_value</parameter> 

Provide the AWS credentials & the security group you created as values of the above configuration items.

Step 5: Start the server

If everything went well, you should not see any errors when the server starts up, and also see the following log message:

[2015-06-23 09:26:41,674]  INFO - HazelcastClusteringAgent Using aws based membership management scheme

and when new members join the cluster, you should see messages such as the following:
[2015-06-23 09:27:08,044]  INFO - AWSBasedMembershipScheme Member joined [5327e2f9-8260-4612-9083-5e5c5d8ad567]: /

and when members leave the cluster, you should see messages such as the following:
[2015-06-23 09:28:34,364]  INFO - AWSBasedMembershipScheme Member left [b2a30083-1cf1-46e1-87d3-19c472bb2007]: /

The complete clustering section in the axis2.xml file is given below:
<clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent"
        <parameter name="AvoidInitiation">true</parameter>
        <parameter name="membershipScheme">aws</parameter>
        <parameter name="domain">wso2.carbon.domain</parameter>

        <parameter name="localMemberPort">5701</parameter>
        <parameter name="accessKey">xxxxxxxxxxxx</parameter>
        <parameter name="secretKey">yyyyyyyyyyyy</parameter>
        <parameter name="securityGroup">a_group_name</parameter>
        <parameter name="region">us-east-1</parameter>
        <parameter name="tagKey">a_tag_key</parameter>
        <parameter name="tagValue">a_tag_value</parameter> 

        <parameter name="properties">
            <property name="backendServerURL" value="https://${hostName}:${httpsPort}/services/"/>
            <property name="mgtConsoleURL" value="https://${hostName}:${httpsPort}/"/>
            <property name="subDomain" value="worker"/>

Friday, June 12, 2015

The Isolation Continuum

Tuesday, April 21, 2015

Vega - the Sri Lankan Supercar

For the past 3 months, I have been on sabbatical & away from WSO2. During this period, I got the privilege of working with Team Vega, which is building a fully electric supercar. It was a great opportunity for me since anything to do with vehicles is my passion. The car is being 100% hand built, with around 95% of the components being manufactured locally in Sri Lanka.

Vega Logo

The work on the car is progressing nicely. The images below show the plug which will be used to develop a mold. The mold in turn, will be used to develop the body panels. The final product will have Carbon Fiber body panels.

The vehicle chassis is a space frame. This provides the required strength & rigidity, as well as ease of development using simple fabrication methods. The following images show the space frame chassis of Vega.

The following image shows the 450 HP motors coupled with a reduction gear box that powers Vega. This setup will power the rear wheels. The wheels are not coupled in any sort, and differential action is controlled via software. You can see the gear box in the center, and the two motors on either side of it.

450 HP motor & motor controller

450 HP motor

One of the highlights of this vehicle is its mechanical simplicity. The vehicles uses very little mechanical parts compared to traditional vehicles, and all the heavy lifting is done by the electronics & software.  There will be around 25 micro-controllers that communicate via CAN bus. Most of the actuation & monitoring will be via messaging between these micro-controllers.

The power required is supplied by a series of battery packs. The battery packs are built using 3.3V Lithium Iron Phosphate (LiFePO4) cells. This cell has high chemical stability under varying conditions. There is a battery management system which monitors the batteries & handles charging of the batteries.

A single LiFePO4 cell

Battery module with cooling lines

A single battery module mounted on the Vega chassis

 When it comes to electric vehicle charging, there are two leading standards; J1772 & CHAdeMO. The team is also building chargers which will be deployed is various locations. The image below shows a Level 2 charger. There are 3.3kW & 6.5kW options available. 1 hour of charging using this charger will give a range of 25Km on average.

The image below shows the super charger that is being built. There are 12.5kW & 25kW options available at the moment. This charger can charge the battery up to 80% of its capacity within a period of 20 minutes.  

With electric vehicles gradually gaining popularity in Sri Lanka & the rest of the world, it has become a necessity to deploy chargers in public locations. This leads to a new problem of managing & monitoring chargers, as well as billing for charging. OCPP (Open Charge Point Protocol) is a specification which has been adopted by a number of countries & organizations to carry out these tasks. The Vega chargers will also support this standard.

CAD diagrams of the Vega supercar (in the background)
Last day with Team Vega
It was a wonderful experience working with Team Vega, even though it was for a very short time, and I am looking forward to the day where I get to test drive the supercar.

Update: Video introducing Vega 

Update: The project has gained so much attention, even the President of Sri Lanka, Maithreepala Sirisena visited the Vega premises.

President Sirisena checking out a 3D printed replica of Vega


Thursday, October 16, 2014

TCP Load Balancing with HAProxy

HAProxy is a free, very fast and reliable solution offering high availability, load balancing, and proxying for TCP and HTTP-based applications. It is one of the easiest load balancers to configure when it comes to TCP load balancing.

In this post, we will look at an example HAProxy config, and example Java code based on the standard "Knock knock server-client" code. We are going to run two TCP servers, which will be fronted by an HAProxy instance, and distribute the load in a round-robin fashion. The TCP servers will be running on localhost ports 4444 & 5555 respectively.

Here's the game plan

HAProxy Configuration

Once you have installed HAProxy, you need to create a config file with the following content.

frontend localnodes
    bind *:8080
    mode tcp
    default_backend nodes
    timeout client          1m

backend nodes
    mode tcp
    balance roundrobin
    server web01
    server web02
    timeout connect        10s
    timeout server          1m

Note that the default_backend entry in the frontend section of the configuration points to the backend section named nodes.  HAProxy will be proxying traffic on the 8080 port. Our sample knock-knock client will connect to this 8080 port. The rest of the configuration is self explanatory.

Once you have the configuration file created, start HAProxy from its home directory as follows:

sbin/haproxy -f haproxy.conf

If everything is running fine, you should not see any errors or warnings when you run the above command.

Java Code

Now let us look at the relevant Java code. I am using the same Knock knock example from the Oracle Java SE documentation, with slight modification which enable it to serve concurrent clients, and  print a message when a client connects, in order to demonstrate TCP load balancing in action.



public class Server {
    public static void main(String[] args) throws IOException {

        if (args.length != 1) {
            System.err.println("Usage: java KnockKnockServer <port number>");

        int portNumber = Integer.parseInt(args[0]);
        ServerSocket serverSocket = new ServerSocket(portNumber);
        while (true) {
            Socket clientSocket = serverSocket.accept();
            serveRequest(clientSocket, portNumber);

    private static void serveRequest(final Socket clientSocket, final int portNumber) throws IOException {
        Runnable runnable = new Runnable() {
            public void run() {
                try {
                    PrintWriter out =
                            new PrintWriter(clientSocket.getOutputStream(), true);
                    BufferedReader in = new BufferedReader(
                            new InputStreamReader(clientSocket.getInputStream()));
                    String inputLine, outputLine;
                    System.out.println("Got message...");

                    // Initiate conversation with client
                    KnockKnockProtocol kkp = new KnockKnockProtocol();
                    outputLine = kkp.processInput(null);

                    while ((inputLine = in.readLine()) != null) {
                        outputLine = kkp.processInput(inputLine);
                        if (outputLine.equals("Bye."))
                } catch (IOException e) {
                    System.out.println("Exception caught when trying to listen on port "
                            + portNumber + " or listening for a connection");
        new Thread(runnable).start();



public class Client {
    public static void main(String[] args) throws IOException {

        if (args.length != 2) {
                    "Usage: java EchoClient  ");

        String hostName = args[0];
        int portNumber = Integer.parseInt(args[1]);

        try {
            Socket kkSocket = new Socket(hostName, portNumber);
            PrintWriter out = new PrintWriter(kkSocket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(
                    new InputStreamReader(kkSocket.getInputStream()));
            BufferedReader stdIn =
                    new BufferedReader(new InputStreamReader(;
            String fromServer;
            String fromUser;

            while ((fromServer = in.readLine()) != null) {
                System.out.println("Server: " + fromServer);
                if (fromServer.equals("Bye."))

                fromUser = stdIn.readLine();
                if (fromUser != null) {
                    System.out.println("Client: " + fromUser);
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host " + hostName);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for the connection to " +


public class KnockKnockProtocol {
    private static final int WAITING = 0;
    private static final int SENTKNOCKKNOCK = 1;
    private static final int SENTCLUE = 2;
    private static final int ANOTHER = 3;

    private static final int NUMJOKES = 5;

    private int state = WAITING;
    private int currentJoke = 0;

    private String[] clues = { "Turnip", "Little Old Lady", "Atch", "Who", "Who" };
    private String[] answers = { "Turnip the heat, it's cold in here!",
            "I didn't know you could yodel!",
            "Bless you!",
            "Is there an owl in here?",
            "Is there an echo in here?" };

    public String processInput(String theInput) {
        String theOutput = null;

        if (state == WAITING) {
            theOutput = "Knock! Knock!";
            state = SENTKNOCKKNOCK;
        } else if (state == SENTKNOCKKNOCK) {
            if (theInput.equalsIgnoreCase("Who's there?")) {
                theOutput = clues[currentJoke];
                state = SENTCLUE;
            } else {
                theOutput = "You're supposed to say \"Who's there?\"! " +
                        "Try again. Knock! Knock!";
        } else if (state == SENTCLUE) {
            if (theInput.equalsIgnoreCase(clues[currentJoke] + " who?")) {
                theOutput = answers[currentJoke] + " Want another? (y/n)";
                state = ANOTHER;
            } else {
                theOutput = "You're supposed to say \"" +
                        clues[currentJoke] +
                        " who?\"" +
                        "! Try again. Knock! Knock!";
                state = SENTKNOCKKNOCK;
        } else if (state == ANOTHER) {
            if (theInput.equalsIgnoreCase("y")) {
                theOutput = "Knock! Knock!";
                if (currentJoke == (NUMJOKES - 1))
                    currentJoke = 0;
                state = SENTKNOCKKNOCK;
            } else {
                theOutput = "Bye.";
                state = WAITING;
        return theOutput;

Running the Server 

The server can be run by providing the server port as an argument to the Java program. We will start the two TCP servers as follows:

      java -cp classes Server 4444 

      java -cp classes Server 5555 

Running the client 

Start two client instances. The client can be started by providing the server IP address and port as follows:

      java -cp Client 8080

Note that we are providing the server port of the HAProxy frontend node, and not the actual port of any of the TCP servers.


Try running two different clients should get connected to the two TCP servers. If TCP load balancing is properly working, you should see the following message printed on the console of the two servers.

       Got message...

Friday, November 22, 2013

WSO2 Multi-tenant Cache: JSR-107 (JCache) implementation based on Hazelcast

With the WSO2 Carbon 4.2.0 kernel release, we have moved from Tribes to Hazelcast, and have also introduced a new JSR-107 (JCache) caching implementation.

Core Features

  1. Local & distributed mode
  2. L1 & L2 caching model for distributed mode
  3. Multi-tenancy

This cache supports both local mode & distributed mode. If (Axis2) clustering is enabled, the cache works as a distributed cache. It is simply using a Hazelcast distributed map per cache. If clustering is not enabled, then the same caching implementation works as a local cache. There is no code change required in the code that uses JCache APIs to switch from local cache mode to distributed cache mode.

In addition, in distributed cache mode, in order to improve performance, we have an L1 cache, which is simply implemented using HashMaps, and the L2 cache is implemented using Hazelcast distributed maps. So, we first check the L1 cache, and if there is a cache miss, we go to the L2 cache. If the value is located in the L2 cache, then we also store it in the L1 cache. This is illustrated in the sequence diagram below.

In distributed mode, If a cache entry is removed, invalidated or changed, then Hazelcast listeners we have registered get triggered on each & every Hazelcast cluster member. That will result in the values in the L1 cache & L2 cache being removed or updated. The following sequence diagram shows

Each cache operation is automatically tenant qualified. The tenant is derived from the thread context using the CarbonContext tenant ID & tenant domain. Caches created by a particular tenant can only be accessed by that tenant. This ensures that tenants cannot gain illegal access to caches owned by other tenants. 

Code Examples

The following examples illustrate how the JCache APIs can be used to acquire the cache & carry out caching operations.

Example 1: Simple cache creation

        CacheManager cacheManager1 =   Caching.getCacheManagerFactory().getCacheManager("sampleCacheManager");

        Cache<String, Integer> cache = cacheManager.getCache("sampleCache");

        int value1 = 9876;

        cache.put(key, value1);

        int value = cache.get(key).intValue()

Example 2: Using cache configuration with a custom cache expiry

CacheManager cacheManager = Caching.getCacheManagerFactory().getCacheManager("test");

        String cacheName = "cacheXXX";

        cache = cacheManager.<String, Integer>createCacheBuilder(cacheName).

                setExpiry(CacheConfiguration.ExpiryType.MODIFIED, new CacheConfiguration.Duration(TimeUnit.SECONDS, 10)).


        int value = 9876;

        cache.put(key, value);

        assertEquals(cache.get(key).intValue(), value);

The relevant code is available at the following SVN locations;

Saturday, July 06, 2013

Travel Dua

اللَّهُ أَكْبَرُ، اللَّهُ أَكْبَرُ، اللَّهُ أَكْبَرُ سُبْحَانَ الَّذِي سَخَّرَ لَنَا هَذَا وَمَا كُنَّا لَهُ مُقْرِنِينَ وَإِنَّا إِلَى رَبِّنَا لَمُنْقَلِبُونَ، اللَّهُمَّ إِنَّا نَسْأَلُكَ فِي سَفْرِنَا هَذَا الْبِرَّ وَالتَّقْوَى ، وَمِنَ الْعَمَلِ مَا تَرْضَى ، اللَّهُمَّ هَوَّنْ عَلَيْنَا سَفْرِنَا هَذَا وَاطْوَعَّنَّا بَعْدهُ ، اللَّهُمَّ أَنْتَ الصَّاحِبُ فِي السَّفَرِ، وَالْخَلِيفَةُ فِي الأَهْلِ، اللَّهُمَّ إِنِّي أَعُوْذُ بِكَ مِنْ وَعْثَاءِ السَّفَرِ، وَكآبَةِ الْمَنْظَرِ وَسُوءِ المُنْقَلَبِ فِي الْمَالِ وَالأَهْلِ + آيِبُونَ تَائْبُونَ عَابِدُونَ لِرَبِّنَا حَامِدُونَ