Self Monitoring Processor Design for a State Machine Implementation

Self Monitoring Processor Design for a FSM

Using the Runnable and Callable interface as well as the Java 6 ThreadPool classes to create a class design that not only executes a task, but can monitor the execution progress periodically Here is how the interface looks like. Note that each processor has an associated ‘State’ and each state has a status - like ex scheduled, started, awaiting_async, success , fail ,timeout etc. This is because the IProcessor class is also used to implement a state machine.I won't be going through this part in more detail here.
 
public interface IProcessor extends Runnable,
  Callable {

  ProcessingStatus getCurrentStatus();
ProcessingState getCurrentState();

ProcessingStatus execute();
Future getTask();

void setScheduledFutureHandle(ScheduledFuture handle);

Now we define an Abstract class that does the basic work
 

public abstract class Processor implements IProcessor,
  Comparable {

@Override
//Method which does the actual work
public ProcessingStatus call() throws InterruptedException {
..
//Actual execution is delegated to the subclass
currentStatus = execute();
...
}
//Method which is called from the Scheduled Thread Pool , and which  //monitors the state of the task; ex if it is running beyond timeout, //putting it to timeout etc , basically Task Status Monitoring
@Override
public void run() {
}
All other different Processor classes will just have to inherit and implement the execute method
 
public class ProcessorStateX extends Processor {


       /**
  * execute method will be called as part of the processor execution
  */
 public ProcessingStatus execute() {
               //do some StateX related work
 }
Now these tasks will be created and pushed to a Input queue and from there a controlling class will take this and add this to a Thread Pool; Let us see how it is done
 
public class StateController{


private ExecutorService fxdpool = null;
private ScheduledExecutorService pool = null;


//Create the ThreadPools
public void startStateController(){


this.fxdpool = Executors.newFixedThreadPool(FIXED_THREADPOOL_COUNT_2);
this.pool = Executors.newScheduledThreadPool(SCHEDULED_THREADPOOL_COUNT_4);
}


//The method that takes from the input queue and adds it to the ThreadPool
//@processor is what is present in the Task Completed queue
private void pollTaskQueueAndExecute(IProcessor processor) throws InterruptedException {


//This passes in the current completed (success or failed ) processor and looks up a state flow xml to create a next processor that represents the next state in the State Machine
taskProcessor = getNextProcessor(processor);


  //send it for execution
  //Will call Processor::call method
Future unusedFuture = fxdpool.
              submit((Runnable) taskProcessor.getTask());


 if (unusedFuture.isCancelled()) {
       log.info("Task has been cancelled :"
       + taskProcessor.getTask());
 }
  //Create a scheduled checker for the task- Task Self monitoring
  // Will call Processor::run method
  final ScheduledFuture handle = pool.scheduleAtFixedRate(
      taskProcessor, 0,      taskProcessor.getPeriodicExecutionInterval(),
      TimeUnit.MILLISECONDS);
}


}
Thats about it. And here is the method in the SiteController that causes the transition between different states, which basically implements the FSM.

Comments

Popular posts from this blog

Long running Java process resource consumption monitoring , leak detection and GC tuning

OpenLayers Advanced Clustering and Setting dynamic images for OpenLayer Styles via Ajax

JavaScript (Prototypal) Inheritance with new and Object.create