Scheduling periodic messages on actor creation
The class SchedulerOnCreate implements an actor, which starts sending of periodic messages as soon as it receives the initial START message. It cancels sending periodic messages only on stop. Message sending interval in seconds is set in the actor constructor. For the real application interval value may be taken from a configuration.
The SchedulerOnCreate extends the AbstractLoggingActor, which implements a default logger. Calls to log() are used for tracing the actor execution.
public class SchedulerOnCreate extends AbstractLoggingActor {
// Protocol
private static final String TICK = "tick";
private static final String START = "start";
private static final String ACTOR_SYSTEM = "MySystem";
private static final String ACTOR = "scheduler";
private Cancellable tick;
private long intervalSecs;
public static Props props(long intervalSecs) {
return Props.create(SchedulerOnCreate.class, () -> new SchedulerOnCreate(intervalSecs));
}
public SchedulerOnCreate(long intervalSecs) {
this.intervalSecs = intervalSecs;
receive(ReceiveBuilder
.matchEquals(TICK, this::onTick)
.matchEquals(START, m -> schedule())
.matchAny(m -> unhandled(m))
.build());
}
@Override
public void postStop() throws Exception {
tick.cancel();
super.postStop();
}
private void schedule() {
tick = getContext().system().scheduler().schedule(
Duration.Zero(),
Duration.create(intervalSecs, TimeUnit.SECONDS),
self(),
TICK,
getContext().dispatcher(),
null);
}
private void onTick(String tick) {
log().info("ticking ...");
}
public static void main( String[] args ) {
ActorSystem system = ActorSystem.create(ACTOR_SYSTEM);
ActorRef actor = system.actorOf(SchedulerOnCreate.props(10), ACTOR);
actor.tell(START, ActorRef.noSender());
terminate(system, actor);
}
private static void terminate(ActorSystem system, ActorRef actor) {
System.out.println("Enter to terminate");
try (Scanner input = new Scanner(System.in)) {
input.nextLine();
System.out.println("The end");
system.stop(actor);
system.terminate();
System.exit(0);
}
}
}
Scheduling messages in chain
The class SchedulerOnReceive implements an actor, which sends each time a single message. The first message is sent on start - from the preStart method. The next message is scheduled upon receiving a previous message. The method postRestart is overridden in order to cancel a call to preStart, which is done by the root actors class AbstractActor.
public class SchedulerOnReceive extends AbstractLoggingActor {
// Protocol
private static final String TICK = "tick";
private static final String ACTOR_SYSTEM = "MySystem";
private static final String ACTOR = "scheduler";
private long initialDelayMillis = 0L;
private long intervalSecs = 0L;
public static Props props(long initialDelayMillis, long intervalSecs) {
return Props.create(SchedulerOnReceive.class, () -> new SchedulerOnReceive(initialDelayMillis, intervalSecs));
}
public SchedulerOnReceive(long initialDelayMillis, long intervalSecs) {
this.initialDelayMillis = initialDelayMillis;
this.intervalSecs = intervalSecs;
receive(ReceiveBuilder
.matchEquals(TICK, this::onTick)
.matchAny(m -> unhandled(m))
.build());
}
@Override
public void preStart() throws Exception {
log().info("Scheduling the first tick..");
getContext().system().scheduler().scheduleOnce(
Duration.create(initialDelayMillis, TimeUnit.MILLISECONDS),
self(),
TICK,
getContext().dispatcher(),
null);
}
@Override
public void postRestart(Throwable reason) throws Exception {
// No call to preStart
}
private void onTick(String tick) {
log().info("Scheduling next tick..");
getContext().system().scheduler().scheduleOnce(
Duration.create(intervalSecs, TimeUnit.SECONDS),
self(),
TICK,
getContext().dispatcher(),
null);
}
public static void main( String[] args ) {
ActorSystem system = ActorSystem.create(ACTOR_SYSTEM);
ActorRef scheduler = system.actorOf(SchedulerOnReceive.props(500, 10), ACTOR);
terminate(system, actor);
}
private static void terminate(ActorSystem system, ActorRef actor) {
System.out.println("Enter to terminate");
try (Scanner input = new Scanner(System.in)) {
input.nextLine();
System.out.println("The end");
system.stop(actor);
system.terminate();
System.exit(0);
}
}
}
Full sources on Git.
No comments :
Post a Comment