Related Topics: Java Developer Magazine

Java Developer : Article

Achieving Thread Synchronization & Parallelized Execution in Java

Building multithreaded execution capabilities in a Java application

We instantiate a CyclicBarrier object for the "All" activity and supply the reference to it to all the ActivityRunner objects created for this "All." After launching the child activities in independent threads, the thread corresponding to the "All" activity - in other words the process thread - calls the await() method on this barrier to begin waiting at the synchronization point. This waits until all the child activity threads complete their executions and join the barrier (see Listing 2).

After completing the execution of the corresponding activity, each ActivityRunner calls the await() method on the barrier and waits for others to join. Inside the ActivityRunner we have:

class ActivityRunner implements Java.lang.Runnable {
private Activity act; //reference to the activity
private CyclicBarrier barrier; // Reference to the barrier
...
synchronized public void run() {
...
try {
act.execute(); //execute the activity
} finally {
barrier.await(); //join the sync point and wait for others
}
...

Sub-Process Spawn
The "Spawn" activity in a process launches the execution of another process so that the launched process executes independently and the main process moves on to the activity subsequent to the "Spawn" activity, without waiting for the launched process to complete. In WS-BPEL, an equivalent would be to have a process with a <receive> element at the top and no <reply> element at the end of the process, indicating that this process is always invoked asynchronously from the calling process.

We refer to the launched process as a sub-process and the process that is launching it as the main process. In the execute() method of the spawn activity, a new process object instance (similar to the way it's done for a normal process) is created for the sub-process from the sub-process's process definition. Then, values for its input parameters are set. ThreadPoolExecutor and ProcRunner classes are used to run the sub-process independently in a separate thread.

/* extends
runnable */
ProcRunner procRunner = new ProcRunner(subProcess); //extends Runnable
// Spawn the sub process in a new thread.
procPooledExecutor.execute(procRunner);
//end of execution of spawn activity

The spawn activity's execution is over right after the launch of the sub-process and control immediately returns to the main process and moves on to the subsequent activity of the main process.

Process State Storage Shared Update
The state (including context) of the executing process is persisted in the database by the BPMS server at regular logical intervals such as at the start of the process, before the core execution of activities, and after execution of activities. The process parameters (variables) and activity states are captured along with the data items such as timestamps that are relevant for process history. The process state information is used by the BPMS server to faithfully recover/restore and revive a process instance from an exception, or revive a passivated process instance upon the arrival of the message it was waiting for and so on. That way, the process execution can continue from a logically correct (valid) state.

Since a process can involve parallel paths (fork and join) and given that activities executing in parallel can update the same process parameter, whenever the process state is stored it has to be done in such a way that the context is logically consistent across activities. At the time, as the process resumption from this state, the activities receive a consistent set of parameter values in the context. This is ensured in the BPMS server by synchronizing the state storage update across all the parallel activity threads using the CyclicBarrier from the concurrent library, and by making sure the process state is stored only when all the executing parallel activities are in the same state of execution (i.e., either all of them are in the pre-execute stage or all of them are in the post-execute stage). The activities that are in a waiting state (say, user activity waiting for a user's message indicating the action is complete) are not considered for this synchronization as there would be no chance that they update any process parameter; they are currently non-executing.

In the activity pre-execute stage, a CyclicBarrier instance is created with the number of parties specified as equal to the number of activities executing at that time. Also specified in the CyclicBarrier is a common task that the barrier gets executed when all the threads join it and before the threads are released from the barrier. That common task extends the Java.lang.Runnable with a run() method, executing the logic of storing the process state, context, and activity state in the database. Then each activity would supply the activity state information and join the barrier by calling the await() method on the barrier. When the last activity executing joins the barrier, the common task to update the database with process state runs, while all the activity threads remain on hold in the barrier. This is a beautiful feature of the CyclicBarrier, which runs the common task just once and makes sure it's done before the threads are released from the barrier. In the process object, we have the code as shown in Listing 3.

In the Activity pre-execute and post-execute code, we first call createStateBarrier() and then join and wait.

thisprocess.createStateBarrier();
thisprocess.getStateRef().addActivityStateDetails(activityStateDetail);
thisprocess.stateBarrier.await();

At post-execute of activities, a new CyclicBarrier is created similar to what we see above for pre-execute. Ideally all the activity threads executing simultaneously would reach the post-execute stage and join the state update synchronization point to do a synchronized update. However, some activities might have gone into a waiting state (say, a user activity expecting a user action) and so wouldn't be active anymore. Such activities inform the thisprocess object that they are in a waiting state and that they should not be considered for a shared state update, since they can't possibly update the state or bring it into an inconsistent state. When this happens, the thisprocess increments the numOfThreadsWaiting counter, so that when the stateBarrier gets created, it's created for one less thread count. However, if by this time the barrier has already been created, then before waiting such an activity joins the stateBarrier so that the other executing threads can join this barrier in their post-execute and complete the state update.

Semaphores
In the code above, _mutex refers to a Semaphore object. Semaphore is a class provided by the concurrent utility to manage critical sections in the code that need to be made safe in the context of simultaneous multithread access. This class is a counting semaphore and restricts the number of threads that can access some resource in the program.

In the case above, access to the thread counter variables (numOfThreadsWaiting, totalThreadCount) is controlled. At each logical point in the process execution where the process state needs to be stored, a barrier should be created only for the number of threads currently active (not waiting), and since there would be as many threads executing in parallel as there are active activities, the access/update of these counter variables must be made thread-safe.

Semaphore _mutex = new Semaphore(1, true);

We create an instance of the Semaphore class here each time we reach the logical point to store the process state and then use it to lock access to the critical section. The first argument to the Semaphore constructor is given as one indicating that only one thread can acquire the lock and execute the critical section code (mutual exclusion lock) and the other threads trying to acquire it at the same time would simply wait until the lock is released.

More Stories By Parameswaran Seshan

Parameswaran Seshan performs the role of an independent educator/trainer, architect, researcher, and architecture consultant, in Information Technology (IT). He teaches architecture, design and technology related courses. Prior to this, he worked as Principal (Education and Research) with E-Comm Research Lab, Infosys Technologies Limited, Bangalore, India. He has more than 15 years of work experience in the IT industry, involving teaching, architecture, research, and programming. His areas of interest include Enterprise Architecture, Process-centric architecture, Intelligent software systems, Intelligent agents, software architecture, Business Process Management systems, Web services and Java. You can reach Parameswaran at, contact {at} bitsintune [dot] com.

Comments (0)

Share your thoughts on this story.

Add your comment
You must be signed in to add a comment. Sign-in | Register

In accordance with our Comment Policy, we encourage comments that are on topic, relevant and to-the-point. We will remove comments that include profanity, personal attacks, racial slurs, threats of violence, or other inappropriate material that violates our Terms and Conditions, and will block users who make repeated violations. We ask all readers to expect diversity of opinion and to treat one another with dignity and respect.