Thursday, 28 January 2016

Phaser in Java concurrency

Phaser is also one of the synchronization aid provided by java concurrency util which is similar to other synchronization barrier utils like CountDownLatch and CyclicBarrier. What sets Phaser apart is it is reusable (like CyclicBarrier) and more flexible in usage. In both CountDownLatch and CyclicBarrier number of parties (thread) that are registered for waiting can't change where as in Phaser that number can vary. Also note that Phaser has been introduced in Java 7.

Phaser is more suitable for use where it is required to synchronize threads over one or more phases of activity. Though Phaser can be used to synchronize a single phase, in that case it acts more like a CyclicBarrier. But it is more suited where threads should wait for a phase to finish, then advance to next phase, wait again for that phase to finish and so on.

How Phaser is more flexible

Unlike the case for other barriers, the number of parties registered to synchronize on a phaser may vary over time. Tasks may be registered at any time (using methods register(), bulkRegister(int), or by specifying initial number of parties in the constructor). Tasks may also be optionally deregistered upon any arrival (using arriveAndDeregister()).

How Phaser works

Phaser class has 4 constructors

  • Phaser() - Creates a new phaser with no initially registered parties, no parent, and initial phase number 0.
  • Phaser(int parties) - Creates a new phaser with the given number of registered unarrived parties, no parent, and initial phase number 0.
  • Phaser(Phaser parent) - Creates a new phaser with the given parent with no initially registered parties.
  • Phaser(Phaser parent, int parties) - Creates a new phaser with the given parent and number of registered unarrived parties.

So first thing is to create a new instance of Phaser.

Next thing is to register one or more parties with the Phaser. That can be done using register(), bulkRegister(int) or by specifying number of parties in the constructor.

resgister() method

public int register()

Adds a new unarrived party to this phaser. It returns the arrival phase number to which this registration applied.

Now since Phaser is a synchronization barrier so we have to make phaser wait until all registered parties finish a phase. That waiting can be done using arrive() or any of the variants of arrive() method. When the number of arrivals is equal to the parties which are registered that phase is considered completed and it advances to next phase (if there is any), or terminate.

arrive() method

public int arrive()

Arrives at this phaser, without waiting for others to arrive. Note that arrive() method does not suspend execution of the calling thread. Returns the arrival phase number, or a negative value if terminated. Note that this method should not be called by an unregistered party.

arriveAndDeregister

public int arriveAndDeregister()

Arrives at this phaser and deregisters from it without waiting for others to arrive. Returns the arrival phase number, or a negative value if terminated.

arriveAndAwaitAdvance

public int arriveAndAwaitAdvance()

Arrives at this phaser and awaits others. Returns the arrival phase number, or the (negative) current phase if terminated. If you want to wait for all the other registered parties to complete a given phase then use this method.

Note that each generation of a phaser has an associated phase number. The phase number starts at zero, and advances when all parties arrive at the phaser, wrapping around to zero after reaching Integer.MAX_VALUE.

Phaser termination

A phaser may enter a termination state, that may be checked using method isTerminated(). Upon termination, all synchronization methods immediately return without waiting for advance, as indicated by a negative return value. Similarly, attempts to register upon termination have no effect.

Phaser Tiering

Phasers may be tiered (i.e., constructed in tree structures) to reduce contention. Phasers with large numbers of parties may experience heavy synchronization contention costs. These may be set up as a groups of sub-phasers which share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.

Phaser Monitoring

Phaser class has several methods for monitoring. These methods can be called by any caller not only by registered parties.

  • getRegisteredParties() - Returns the number of parties registered at this phaser.
  • getArrivedParties() - Returns the number of registered parties that have arrived at the current phase of this phaser.
  • getUnarrivedParties() - Returns the number of registered parties that have not yet arrived at the current phase of this phaser.
  • getPhase() - Returns the current phase number.

Phaser example code

Let's try to make things clearer through an example. So we'll have two phase in the application where in first phase let's say we have three threads reading 3 different files, parsing and storing them in DB, then in second phase 2 threads are started to query the DB table on the inserted records. Let's assume that one of the field is age in the DB table and we want to query count of those having age greater than 40 using one thread and in another thread we want to get the count of those having age less than or equal to 40.

public class PhaserDemo {

    public static void main(String[] args) {
        Phaser ph = new Phaser(1);
        // Threads for first phase
        new FileReaderThread("thread-1", "file-1", ph);
        new FileReaderThread("thread-2", "file-2", ph);
        new FileReaderThread("thread-3", "file-3", ph);
        int curPhase;
        curPhase = ph.getPhase();
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " completed");
        
        // This will be the second phase where 
        // threads are deregistered from the phaser
        curPhase = ph.getPhase();
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " completed");
        
        // Threads for third phase
        new QueryThread("thread-1", 40, ph);
        new QueryThread("thread-2", 40, ph);
        curPhase = ph.getPhase();
        ph.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " completed");
        // deregistering the main thread
        ph.arriveAndDeregister();
    }
}
class FileReaderThread implements Runnable {
    private String threadName;
    private String fileName;
    private Phaser ph;

    FileReaderThread(String threadName, String fileName, Phaser ph){
        this.threadName = threadName;
        this.fileName = fileName;
        this.ph = ph;
        ph.register();
        new Thread(this).start();
    }
    @Override
    public void run() {
        System.out.println("This is phase " + ph.getPhase());
        System.out.println("Reading file " + fileName + " thread " + threadName + "parsing and storing to DB ");
        // Using await and advance so that all thread wait here
        ph.arriveAndAwaitAdvance();
        try {
            // Just for creating some delay, not 
            // actually required 
Thread.sleep(20);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("Deregistering");
        ph.arriveAndDeregister();
    }
}
class QueryThread implements Runnable {
    private String threadName;
    private int param;
    private Phaser ph;
    
    QueryThread(String threadName, int param, Phaser ph){
        this.threadName = threadName;
        this.param = param;
        this.ph = ph;
        ph.register();
        new Thread(this).start();
    }
    
    @Override
    public void run() {
        System.out.println("This is phase " + ph.getPhase());
        System.out.println("Querying DB using param " + param + " Thread " + threadName);
        ph.arriveAndAwaitAdvance();
    }
}

Output

This is phase 0
This is phase 0
This is phase 0
Reading file file-2 thread thread-2parsing and storing to DB 
Reading file file-1 thread thread-1parsing and storing to DB 
Reading file file-3 thread thread-3parsing and storing to DB 
Phase 0 completed
Deregistering
Deregistering
Deregistering
Phase 1 completed
This is phase 2
Querying DB using param 40 Thread thread-1
This is phase 2
Querying DB using param 40 Thread thread-2
Phase 2 completed

Here it can be seen that first a Phaser instance ph is created with initial party count as 1, which corresponds to the main thread. Then in the first set of 3 threads which are used in the first phase ph object is also passed which is used for synchronization. Then in the second phase the set of three threads used in the first phase are deregistered. In the third phase another set of two threads are created which used the same phaser object ph for synchronization. Main logic for reading the file, parsing the file and storing it in the DB is not given here. Also the queries used in the second thread are not given. The scenario used here is to explain Phaser so that's where the concentration is.

Overriding onAdvance() method

If you want to perform an action before advancing from one phase to another, it can be done by overriding the onAdvance() method of the Phaser class. This method is invoked when the Phaser advances from one phase to another.
If this method returns true, this phaser will be set to a final termination state upon advance, and subsequent calls to isTerminated() will return true.
If this method returns false, phaser will be kept alive.

onAdvance() method

protected boolean onAdvance(int phase, int registeredParties)
Here
  • phase - current phase number on entry to this method, before this phaser is advanced.
  • registeredParties - the current number of registered parties.

One of the use case to override onAdvance() method is to ensure that your phaser executes a given number of phases and then stop.

So we'll create a class called TestPhaser that will extend Phaser and override the onAdvance() method to ensure that specified number of phases are executed.

Example code overriding onAdvance() method

public class PhaserAdvance extends Phaser{
    PhaserAdvance(int parties){
        super(parties);
    }
    
    // Overriding the onAdvance method
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("In onAdvance method, current phase which is completed is " + phase );
        // Want to ensure that phaser runs for 2 phases i.e. phase 1 
        // or the no. of registered parties become zero
        if(phase == 1 || registeredParties == 0){
            System.out.println("phaser will be terminated ");
            return true;
        }else{
            System.out.println("phaser will continue ");
            return false;
        }     
    }
    
    public static void main(String... args) {
        // crating phaser instance
        PhaserAdvance ph = new PhaserAdvance(1);
        // creating three threads
        new TestThread("thread-1", ph);
        new TestThread("thread-2", ph);
        new TestThread("thread-3", ph);
        
         while(!ph.isTerminated()){
             ph.arriveAndAwaitAdvance();
         }
         System.out.println("In main method, phaser is terminated");
    }
}


class TestThread implements Runnable {
    private String threadName;
    private Phaser ph;

    TestThread(String threadName, Phaser ph){
        this.threadName = threadName;
        this.ph = ph;
        // register new unarrived party to this phaser
        ph.register();
        new Thread(this).start();
    }
    @Override
    public void run() {
        // be in the loop till the phaser is terminated
         while(!ph.isTerminated()){
            System.out.println("This is phase " + ph.getPhase() + " And Thread - "+ threadName);
            // Using await and advance so that all thread wait here
            ph.arriveAndAwaitAdvance();
         }
        
    }
}

Output

This is phase 0 And Thread - thread-1
This is phase 0 And Thread - thread-2
This is phase 0 And Thread - thread-3
In onAdvance method, current phase which is completed is 0
phaser will continue 
This is phase 1 And Thread - thread-3
This is phase 1 And Thread - thread-2
This is phase 1 And Thread - thread-1
In onAdvance method, current phase which is completed is 1
phaser will be terminated 
In main method, phaser is terminated

Here it can be seen that a new class PhaserAdvance is created extending the Phaser class. This PhaserAdvance class overrides the onAdvance() method of the Phaser class. In the overridden onAdvance() method it is ensured that 2 phases are executed thus the if condition with phase == 1 (phase count starts from 0).

That's all for this topic Phaser in Java. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. ConcurrentHashMap in Java
  2. CopyOnWriteArrayList in Java
  3. CountDownLatch in Java concurrency
  4. CyclicBarrier in Java concurrency
  5. Difference between CountDownLatch and CyclicBarrier
  6. Java Concurrency interview questions

You may also like -

No comments:

Post a Comment