Java Solutions for Distributed Transactions and/or Data Shared in Cluster

| | August 6, 2015

What are the best approaches to clustering/distributing a Java server application ?
I’m looking for an approach that allows you to scale horizontally by adding more application servers, and more database servers.

  • What technologies (software engineering techniques or specific technologies) would you suggest to approach this type of problem?
  • What techniques do you use to design a persistence layer to scale to many readers/writers
    Scale application transactions and scale access to shared data (best approach is to eliminate shared data; what techniques can you apply to eliminate shared data).
  • Different approaches seem to be needed depending on whether your transactions are read or write heavy, but I feel like if you can optimize a “write” heavy application that would also be efficient for “read”

The “best” solution would allow you to write a Java application for a single node and hopefully “hide” most of the details of accessing/locking shared data.

In a distributed environment the most difficult issue always comes down to having multiple transactions accessing shared data. There seems like there’s 2 common approaches to concurrent transactions.

  1. Explicit locks (which is extremely error prone and slow to coordinate across multiple nodes in a distributed system)
  2. Software transactional memory (STM) AKA optimistic concurrency where a transaction is rolled back during a commit if it discovers that shared state has changed (and the transaction can later be retried).
    Which approach scales better and what are the trade-offs in a distributed system?

I’ve been researching scaling solutions (and in general applications that provide an example of how to scale) such as:

  1. Terracotta – provides “transparent” scaling by extending the Java memory model to include distributed shared memory using Java’s concurrency locking mechanism (synchronized, ReentrantReadWriteLocks).
  2. Google App Engine Java – Allows you to write Java (or python) applications that will be distributed amongst “cloud” servers where you distribute what server handles a transaction and you use BigTable to store your persistent data (not sure how you transactions that access shared data or handle lock contentions to be able to scale effectively)
  3. Darkstar MMO Server – Darkstar is Sun’s open source MMO (massively multiplayer online) game server they scale transactions in a thread transactional manner allowing a given transaction to only run for a certain amount and committing and if it takes to long it will rollback (kinda like software transactional memory). They’ve been doing research into supporting a multi-node server setup for scaling.
  4. Hibernate’s optimistic locking – if you are using Hibernate you can use their optimistic concurrency support to support software transactional memory type behavior
  5. Apache CouchDB is supposed to “scale” to many reader/writer DB’s in a mesh configuration naturally. (is there a good example of how you manage locking data or ensuring transaction isolation?):
  6. JCache – Scaling “read” heavy apps by caching results to common queries you can use in Google appengine to access memcached and to cache other frequently read data.

Terracotta seems to be the most complete solution in that you can “easily” modify an existing server application to support scaling (after defining @Root objects and @AutoLockRead/Write methods). The trouble is to really get the most performance out of a distributed application, optimization for distributed systems isn’t really an after thought you kinda have to design it with the knowledge that object access could potentially be blocked by network I/O.

To scale properly it seems like it always comes down to partitioning data and load balancing transactions such that a given “execution unit” (cpu core -> thread -> distributed application node -> DB master node)

It seems like though to make any app scale properly by clustering you need to be able to partition your transactions in terms of their data access reads/writes. What solutions have people come up with to distribute their applications data (Oracle, Google BigTable, MySQL, Data warehousing), and generally how do you manage partitioning data (many write masters, with many more read DBs etc).

In terms of scaling your data persistence layer what type of configuration scales out the best in terms of partitioning your data to many readers/many writers (generally I’d partition my data based on a given user (or whatever core entity that generally is your “root” object entity) being owned by a single master DB)

6 Responses to “Java Solutions for Distributed Transactions and/or Data Shared in Cluster”

  1. While Oracle Coherence and a lot of the other solutions suggested are good for sharing data, you only cited locking and STM as ways to manage state mutation in a distributed environment; those are both generally pretty poor ways to scale state management. On a different site, I recently posted the following about how to implement (for example) sequence counters:

    If you’re looking at a counter, then using something like a Coherence EntryProcessor will easily achieve “once-and-only-once” behavior and HA for any number of monotonically increasing sequences; here’s the entire implementation:

    public class SequenceCounterProcessor
            extends AbstractProcessor
        public Object process(InvocableMap.Entry entry)
            long l = entry.isPresent() ? (Long) entry.getValue() + 1 : 0;
            return l;

    Yup. That’s it. Automatic and seamless HA, dynamic scale-out elasticity, once-and-only-once behavior, etc. Done.

    The EntryProcessor is a type of distributed closure that we introduced in 2005.

    As an aside, in Java 8 (not yet release), project Lambda introduces official closure support in the language and the standard libraries.

    Basically, the idea is to deliver the closure to the location of the “owner” of the data in a distributed environment. Coherence dynamically manages data ownership by using dynamic partitioning, allowing the distributed system to load balance data across the various machines and nodes that are running. In fact, by default all of this is 100% automated, so you never actually tell it where to put the data, or how much data goes where. Additionally, there are secondary (and perhaps tertiary etc.) copies of the data managed on other nodes and other physical servers, to provide high availability in case a process fails or a server dies. Again, the management of these backup copies is completely automatic and completely synchronous by default, meaning that the system is 100% HA by default (i.e. with no configuration).

    When the closure arrives at the data owner, it is executed in a transactional workspace, and if the operation completes successfully then it is shipped to the backup for safe keeping. The data mutation (e.g. the result of the operation) is only made visible to the remainder of the system once the backup has been successfully made.

    A few optimizations to the above include adding the ExternalizableLite & PortableObject interfaces for optimized serialization, and avoiding the serialization of the boxed long by going after the “network ready” form of the data directly:

    public Object process(InvocableMap.Entry entry)
            BinaryEntry binentry = (BinaryEntry) entry;
            long l = entry.isPresent() ? binentry.getBinaryValue()
                    .getBufferInput().readLong() + 1 : 0L;
            BinaryWriteBuffer buf = new BinaryWriteBuffer(8);
            return l;
        catch (IOException e)
            throw new RuntimeException(e);

    And since it’s stateless, why not have a singleton instance ready to go?

    public static final SequenceCounterProcessor INSTANCE =
            new SequenceCounterProcessor();

    Using it from anywhere on the network is as simple as a single line of code:

    long l = (Long) sequences.invoke(x, SequenceCounterProcessor.INSTANCE);

    Where “x” is any object or name that identifies the particular sequence counter you want to use. For more info, see the Coherence knowledge base at:

    Oracle Coherence is a distributed system. Whenever you start a Coherence node, it joins with other Coherence nodes that are already running, and dynamically forms an elastic cluster. That cluster hosts data in a partitioned, highly available (HA), and transactionally consistent manner, and hosts operations (like the one I showed above) that operate on that data in a “once and only once” manner.

    Furthermore, in addition to the ability to invoke any of that logic or access any of that data transparently from any Coherence node, you can also invoke any of that logic or access any of that data transparently from any process on the network (subject to authentication and authorization, of course). So this code would work from any Coherence cluster node or from any (Java / C / C++ / C# / .NET) client:

    For the sake of full disclosure, I work at Oracle. The opinions and views expressed in this post are my own, and do not necessarily reflect the opinions or views of my employer.

  2. srini.venigalla on November 30, -0001 @ 12:00 AM

    Thought I found a great Java Clustering/Distributed platform, wanted to reopen this-


    I ran the test programs, it is very cool, very light-weight and simple to use. It automatically detects the Cluster Members in a Peer-to-Peer configuration. The opportunities are limitless.

  3. Eugene Steinberg on November 30, -0001 @ 12:00 AM

    Maybe those slides will be helpful. From our experience I would recommend Oracle (Tangosol) Coherence and GigaSpaces as a most powerful data and processing distribution frameworks out there. Depending on exact nature of the problem, one of those may shine. Terracotta also quite applicable for some of the problems.

  4. Don’t forget Erlang’s Mnesia.

    Mnesia gives you stuff like transactions that you’re used to in a normal DB, but provides real-time operations and fault-tolerance. Plus you can reconfigure things without downtime. Downside is that it’s a memory resident database, so you have to fragment really large tables. Largest table size is 4Gb.

  5. srini.venigalla on November 30, -0001 @ 12:00 AM

    Thanks for nicely summarizing all possibilities in one place.

    One technique is missing here though. It is MapReduce-Hadoop. If it is possible to fit the problem into the MapReduce paradigm, it is perhaps the most widely available solution. I also wonder if the Actor Framework pattern (JetLang, Kilim, etc) can be extended to a cluster.

Leave a Reply