Sunday, October 9, 2016

Akka messages scheduling with Lambda actors

The official Akka documentation shows messages scheduling examples, which are implemented with the UntypedActor and hard-coded scheduling interval. This post demonstrates implementation of scheduling with Lambda actor and flexible interval.

Scheduling periodic messages on actor creation


The class SchedulerOnCreate implements an actor, which starts sending of periodic messages as soon as it receives the initial START message. It cancels sending periodic messages only on stop. Message sending interval in seconds is set in the actor constructor. For the real application interval value may be taken from a configuration.
The SchedulerOnCreate extends the AbstractLoggingActor, which implements a default logger. Calls to log() are used for tracing the actor execution.
 public class SchedulerOnCreate extends AbstractLoggingActor {  
      // Protocol  
      private static final String TICK = "tick";  
      private static final String START = "start";  
      private static final String ACTOR_SYSTEM = "MySystem";  
      private static final String ACTOR = "scheduler";  
      private Cancellable tick;  
      private long intervalSecs;  
      public static Props props(long intervalSecs) {  
           return Props.create(SchedulerOnCreate.class, () -> new SchedulerOnCreate(intervalSecs));  
      }  
      public SchedulerOnCreate(long intervalSecs) {  
           this.intervalSecs = intervalSecs;  
           receive(ReceiveBuilder  
                .matchEquals(TICK, this::onTick)  
                .matchEquals(START, m -> schedule())  
                .matchAny(m -> unhandled(m))  
                .build());  
      }  
      @Override  
      public void postStop() throws Exception {  
           tick.cancel();  
           super.postStop();  
      }  
      private void schedule() {  
        tick = getContext().system().scheduler().schedule(  
                  Duration.Zero(),   
                  Duration.create(intervalSecs, TimeUnit.SECONDS),  
                  self(),   
                  TICK,  
                  getContext().dispatcher(),   
                  null);  
      }       
      private void onTick(String tick) {  
           log().info("ticking ...");  
      }  
      public static void main( String[] args ) {  
          ActorSystem system = ActorSystem.create(ACTOR_SYSTEM);  
          ActorRef actor = system.actorOf(SchedulerOnCreate.props(10), ACTOR);  
          actor.tell(START, ActorRef.noSender());  
          terminate(system, actor);  
      }  
      private static void terminate(ActorSystem system, ActorRef actor) {  
           System.out.println("Enter to terminate");  
           try (Scanner input = new Scanner(System.in)) {  
                input.nextLine();  
                System.out.println("The end");  
                system.stop(actor);  
                system.terminate();  
                System.exit(0);  
           }            
      }  
 }  

Scheduling messages in chain


The class SchedulerOnReceive implements an actor, which sends each time a single message. The first message is sent on start - from the preStart method. The next message is scheduled upon receiving a previous message. The method postRestart is overridden in order to cancel a call to preStart, which is done by the root actors class AbstractActor.
 public class SchedulerOnReceive extends AbstractLoggingActor {  
      // Protocol  
      private static final String TICK = "tick";  
      private static final String ACTOR_SYSTEM = "MySystem";  
      private static final String ACTOR = "scheduler";  
      private long initialDelayMillis = 0L;  
      private long intervalSecs = 0L;  
      public static Props props(long initialDelayMillis, long intervalSecs) {  
           return Props.create(SchedulerOnReceive.class, () -> new SchedulerOnReceive(initialDelayMillis, intervalSecs));  
      }  
      public SchedulerOnReceive(long initialDelayMillis, long intervalSecs) {  
           this.initialDelayMillis = initialDelayMillis;  
           this.intervalSecs = intervalSecs;  
           receive(ReceiveBuilder  
                .matchEquals(TICK, this::onTick)  
                .matchAny(m -> unhandled(m))  
                .build());  
      }  
      @Override  
      public void preStart() throws Exception {  
           log().info("Scheduling the first tick..");  
           getContext().system().scheduler().scheduleOnce(  
                Duration.create(initialDelayMillis, TimeUnit.MILLISECONDS),  
                self(),   
                TICK,  
                getContext().dispatcher(),  
                null);  
      }  
      @Override  
      public void postRestart(Throwable reason) throws Exception {  
           // No call to preStart  
      }  
      private void onTick(String tick) {  
           log().info("Scheduling next tick..");  
           getContext().system().scheduler().scheduleOnce(  
                Duration.create(intervalSecs, TimeUnit.SECONDS),  
                self(),   
                TICK,  
                getContext().dispatcher(),  
                null);  
      }  
      public static void main( String[] args ) {  
           ActorSystem system = ActorSystem.create(ACTOR_SYSTEM);  
           ActorRef scheduler = system.actorOf(SchedulerOnReceive.props(500, 10), ACTOR);  
           terminate(system, actor);  
      }  
      private static void terminate(ActorSystem system, ActorRef actor) {  
           System.out.println("Enter to terminate");  
           try (Scanner input = new Scanner(System.in)) {  
                input.nextLine();  
                System.out.println("The end");  
                system.stop(actor);  
                system.terminate();  
                System.exit(0);  
           }            
      }  
 }  
Full sources on Git.

No comments :

About the author

My Photo
I trust only simple code and believe that code should be handsome. This is not a matter of technology, but professional approach, consolidated after years of software development. I enjoy to cause things working and feel very happy, when I manage to solve a problem.
Back to Top