001// Licensed under the Apache License, Version 2.0 (the "License");
002// you may not use this file except in compliance with the License.
003// You may obtain a copy of the License at
004//
005// http://www.apache.org/licenses/LICENSE-2.0
006//
007// Unless required by applicable law or agreed to in writing, software
008// distributed under the License is distributed on an "AS IS" BASIS,
009// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
010// See the License for the specific language governing permissions and
011// limitations under the License.
012
013package org.apache.tapestry5.ioc.internal.services.cron;
014
015import org.apache.tapestry5.ioc.Invokable;
016import org.apache.tapestry5.ioc.annotations.PostInjection;
017import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
018import org.apache.tapestry5.ioc.services.ParallelExecutor;
019import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
020import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
021import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
022import org.apache.tapestry5.ioc.services.cron.Schedule;
023import org.slf4j.Logger;
024
025import java.util.List;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReentrantLock;
029
030public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
031{
032    private final ParallelExecutor parallelExecutor;
033
034    private final Logger logger;
035
036    // Synchronized by jobLock
037    private final List<Job> jobs = CollectionFactory.newList();
038
039    private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
040
041    private transient boolean shutdown;
042
043    private static final long FIVE_MINUTES = 5 * 60 * 1000;
044
045    private final AtomicInteger jobIdAllocator = new AtomicInteger();
046
047    private final Lock jobLock = new ReentrantLock();
048
049    private class Job implements PeriodicJob, Invokable<Void>
050    {
051        final int jobId = jobIdAllocator.incrementAndGet();
052
053        private final Schedule schedule;
054
055        private final String name;
056
057        private final Runnable runnableJob;
058
059        private boolean executing, canceled;
060
061        private long nextExecution;
062
063        public Job(Schedule schedule, String name, Runnable runnableJob)
064        {
065            this.schedule = schedule;
066            this.name = name;
067            this.runnableJob = runnableJob;
068
069            nextExecution = schedule.firstExecution();
070        }
071
072        @Override
073        public String getName()
074        {
075            return name;
076        }
077
078        public long getNextExecution()
079        {
080            try
081            {
082                jobLock.lock();
083                return nextExecution;
084            } finally
085            {
086                jobLock.unlock();
087            }
088        }
089
090
091        @Override
092        public boolean isExecuting()
093        {
094            try
095            {
096                jobLock.lock();
097                return executing;
098            } finally
099            {
100                jobLock.unlock();
101            }
102        }
103
104        @Override
105        public boolean isCanceled()
106        {
107            try
108            {
109                jobLock.lock();
110                return canceled;
111            } finally
112            {
113                jobLock.unlock();
114            }
115        }
116
117        @Override
118        public void cancel()
119        {
120            try
121            {
122                jobLock.lock();
123
124                canceled = true;
125
126                if (!executing)
127                {
128                    removeJob(this);
129                }
130
131                // Otherwise, it will be caught when the job finishes execution.
132            } finally
133            {
134                jobLock.unlock();
135            }
136        }
137
138        @Override
139        public String toString()
140        {
141            StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
142
143            builder.append(", (").append(name).append(")");
144
145            if (executing)
146            {
147                builder.append(", executing");
148            }
149
150            if (canceled)
151            {
152                builder.append(", canceled");
153            } else
154            {
155                builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
156            }
157
158            return builder.append("]").toString();
159        }
160
161        /**
162         * Starts execution of the job; this sets the executing flag, calculates the next execution time,
163         * and uses the ParallelExecutor to run the job.
164         */
165        void start()
166        {
167            try
168            {
169                jobLock.lock();
170                executing = true;
171
172                // This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
173                // here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
174                // overlapping executions of the Job on a more rigid schedule.  Use Quartz.
175
176                nextExecution = schedule.nextExecution(nextExecution);
177
178                parallelExecutor.invoke(this);
179            } finally
180            {
181                jobLock.unlock();
182            }
183
184            if (logger.isTraceEnabled())
185            {
186                logger.trace(this + " sent for execution");
187            }
188        }
189
190        void cleanupAfterExecution()
191        {
192            try
193            {
194                if (logger.isTraceEnabled())
195                {
196                    logger.trace(this + " execution complete");
197                }
198
199                executing = false;
200
201                if (canceled)
202                {
203                    removeJob(this);
204                } else
205                {
206                    // Again, naive but necessary.
207                    thread.interrupt();
208                }
209            } finally
210            {
211                jobLock.unlock();
212            }
213        }
214
215        @Override
216        public Void invoke()
217        {
218            if (logger.isDebugEnabled())
219            {
220                logger.debug(String.format("Executing job #%d (%s)", jobId, name));
221            }
222
223            try
224            {
225                runnableJob.run();
226            } finally
227            {
228                cleanupAfterExecution();
229            }
230
231            return null;
232        }
233
234    }
235
236    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
237    {
238        this.parallelExecutor = parallelExecutor;
239        this.logger = logger;
240    }
241
242    @PostInjection
243    public void start(RegistryShutdownHub hub)
244    {
245        hub.addRegistryShutdownListener(new Runnable()
246        {
247            @Override
248            public void run()
249            {
250                registryDidShutdown();
251            }
252        });
253
254        thread.start();
255    }
256
257
258    void removeJob(Job job)
259    {
260        if (logger.isDebugEnabled())
261        {
262            logger.debug("Removing " + job);
263        }
264
265        try
266        {
267            jobLock.lock();
268            jobs.remove(job);
269        } finally
270        {
271            jobLock.unlock();
272        }
273    }
274
275
276    @Override
277    public PeriodicJob addJob(Schedule schedule, String name, Runnable job)
278    {
279        assert schedule != null;
280        assert name != null;
281        assert job != null;
282
283        Job periodicJob = new Job(schedule, name, job);
284
285        try
286        {
287            jobLock.lock();
288
289            jobs.add(periodicJob);
290        } finally
291        {
292            jobLock.unlock();
293        }
294
295        if (logger.isDebugEnabled())
296        {
297            logger.debug("Added " + periodicJob);
298        }
299
300        // Wake the thread so that it can start the job, if necessary.
301
302        // Technically, this is only necessary if the new job is scheduled earlier
303        // than any job currently in the list of jobs, but this naive implementation
304        // is simpler.
305        thread.interrupt();
306
307        return periodicJob;
308    }
309
310    @Override
311    public void run()
312    {
313        while (!shutdown)
314        {
315            long nextExecution = executeCurrentBatch();
316
317            try
318            {
319                long delay = nextExecution - System.currentTimeMillis();
320
321                if (logger.isTraceEnabled())
322                {
323                    logger.trace(String.format("Sleeping for %,d ms", delay));
324                }
325
326                if (delay > 0)
327                {
328                    Thread.sleep(delay);
329                }
330            } catch (InterruptedException
331                    ex)
332            {
333                // Ignored; the thread is interrupted() to shut it down,
334                // or to have it execute a new batch.
335
336                logger.trace("Interrupted");
337            }
338        }
339    }
340
341    private void registryDidShutdown()
342    {
343        shutdown = true;
344
345        thread.interrupt();
346    }
347
348    /**
349     * Finds jobs and executes jobs that are ready to be executed.
350     *
351     * @return the next execution time (from the non-executing job that is scheduled earliest for execution).
352     */
353    private long executeCurrentBatch()
354    {
355        long now = System.currentTimeMillis();
356        long nextExecution = now + FIVE_MINUTES;
357
358        try
359        {
360            jobLock.lock();
361
362            for (Job job : jobs)
363            {
364                if (job.isExecuting())
365                {
366                    continue;
367                }
368
369                long jobNextExecution = job.getNextExecution();
370
371                if (jobNextExecution <= now)
372                {
373                    job.start();
374                } else
375                {
376                    nextExecution = Math.min(nextExecution, jobNextExecution);
377                }
378            }
379        } finally
380        {
381            jobLock.unlock();
382        }
383
384        return nextExecution;
385    }
386
387
388}