Quadcap Embeddable Server

com/quadcap/util/threads/PeriodicScheduler.java

Go to the documentation of this file.
00001 package com.quadcap.util.threads; 00002 00003 /* Copyright 1997 - 2003 Quadcap Software. All rights reserved. 00004 * 00005 * This software is distributed under the Quadcap Free Software License. 00006 * This software may be used or modified for any purpose, personal or 00007 * commercial. Open Source redistributions are permitted. Commercial 00008 * redistribution of larger works derived from, or works which bundle 00009 * this software requires a "Commercial Redistribution License"; see 00010 * http://www.quadcap.com/purchase. 00011 * 00012 * Redistributions qualify as "Open Source" under one of the following terms: 00013 * 00014 * Redistributions are made at no charge beyond the reasonable cost of 00015 * materials and delivery. 00016 * 00017 * Redistributions are accompanied by a copy of the Source Code or by an 00018 * irrevocable offer to provide a copy of the Source Code for up to three 00019 * years at the cost of materials and delivery. Such redistributions 00020 * must allow further use, modification, and redistribution of the Source 00021 * Code under substantially the same terms as this license. 00022 * 00023 * Redistributions of source code must retain the copyright notices as they 00024 * appear in each source code file, these license terms, and the 00025 * disclaimer/limitation of liability set forth as paragraph 6 below. 00026 * 00027 * Redistributions in binary form must reproduce this Copyright Notice, 00028 * these license terms, and the disclaimer/limitation of liability set 00029 * forth as paragraph 6 below, in the documentation and/or other materials 00030 * provided with the distribution. 00031 * 00032 * The Software is provided on an "AS IS" basis. No warranty is 00033 * provided that the Software is free of defects, or fit for a 00034 * particular purpose. 00035 * 00036 * Limitation of Liability. Quadcap Software shall not be liable 00037 * for any damages suffered by the Licensee or any third party resulting 00038 * from use of the Software. 00039 */ 00040 00041 import java.io.ByteArrayOutputStream; 00042 import java.io.PrintWriter; 00043 00044 import java.util.Date; 00045 import java.util.Enumeration; 00046 import java.util.Hashtable; 00047 import java.util.Vector; 00048 00049 import com.quadcap.util.DList; 00050 import com.quadcap.util.DListItem; 00051 import com.quadcap.util.Debug; 00052 import com.quadcap.util.ListException; 00053 00054 /** 00055 * This class acts as a scheduler for periodic tasks, which are named and 00056 * execute repeatedly with a specified interval. Each task is a 00057 * <a href="com.quadcap.util.threads.Command.html#_top_">Command</a> 00058 * object, as used by e.g., 00059 * <a href="com.quadcap.util.threads.StreamWorker.html#_top_"> 00060 * StreamWorker</a>.<p> 00061 * 00062 * Two flavors of this class are expressed here -- the first class 00063 * takes a <A href="com.quadcap.util.threads.Stream.html#_top_">Stream</a> 00064 * parameter. When each task becomes ready, the Command object is 00065 * written to the Stream, and it is presumed that a StreamWorker task 00066 * will be sitting at the other end of the Stream, executing the 00067 * Commands that come down the pipe.<p> 00068 * 00069 * The other flavor of this class takes a context argument, which can 00070 * be any Java object (i.e., this class doesn't ever look at the context.) 00071 * When each task becomes ready, it is simply executed within the 00072 * PeriodicScheduler thread. 00073 * 00074 * @author Stan Bailes 00075 */ 00076 public class PeriodicScheduler extends Thread implements DebugMonitor { 00077 DList active = new DList(); 00078 Stream stream = null; 00079 Object context = null; 00080 boolean terminate = false; 00081 Hashtable schedule = new Hashtable(); 00082 00083 /** 00084 * Construct a new PeriodicScheduler which will write Commands to the 00085 * specified Stream. 00086 * 00087 * @param stream the Stream used to write Commands when they become 00088 * ready. 00089 */ 00090 public PeriodicScheduler(ThreadGroup group, String name, Object obj) { 00091 super(group, name); 00092 if (obj instanceof Stream) { 00093 this.stream = (Stream)obj; 00094 } else { 00095 this.context = obj; 00096 } 00097 setDaemon(true); 00098 } 00099 00100 /** 00101 * Add a new task, with the specified name, action, and interval. 00102 * The task will <b>NOT</b> be executed 'now'; rather its first 00103 * execution will be scheduled at time 'now + interval'. 00104 * 00105 * @param name the name of this task. Any existing task with the 00106 * same name is replaced by this one. 00107 * @param c the Command action associated with this task. 00108 * @param interval the interval (in ms) between invocations of this 00109 * task. 00110 */ 00111 public void add(String name, Command c, long interval) { 00112 //Debug.println(3, "add " + name); 00113 synchronized (active) { 00114 ScheduleItem s = (ScheduleItem)schedule.get(name); 00115 if (s == null) { 00116 s = new ScheduleItem(name); 00117 schedule.put(name, s); 00118 } else { 00119 remove(s); 00120 } 00121 s.command = c; 00122 s.interval = interval; 00123 schedule(s); 00124 } 00125 } 00126 00127 /** 00128 * Add a new task, with the specified name, action, and interval. 00129 * The task will <b>NOT</b> be executed 'now'; rather its first 00130 * execution will be scheduled at time 'now + interval'. 00131 * 00132 * @param name the name of this task. Any existing task with the 00133 * same name is replaced by this one. 00134 * @param r the Runnable action associated with this task. 00135 * @param interval the interval (in ms) between invocations of this 00136 * task. 00137 */ 00138 public void add(String name, Runnable r, long interval) { 00139 //Debug.println(3, "add " + name); 00140 synchronized (active) { 00141 ScheduleItem s = (ScheduleItem)schedule.get(name); 00142 if (s == null) { 00143 s = new ScheduleItem(name); 00144 schedule.put(name, s); 00145 } else { 00146 remove(s); 00147 } 00148 s.runnable = r; 00149 s.interval = interval; 00150 schedule(s); 00151 } 00152 } 00153 00154 void remove(ScheduleItem item) { 00155 //Debug.println(3, "remove " + item.name); 00156 synchronized (active) { 00157 DListItem head = null; 00158 try { 00159 head = active.head(); 00160 } catch (ListException e) { 00161 } 00162 DListItem d = head; 00163 if (d != null) do { 00164 ScheduleItem s = (ScheduleItem)d.obj; 00165 if (item == s) { 00166 active.unlink(d); 00167 break; 00168 } 00169 d = d.next; 00170 } while (d != head); 00171 active.notify(); 00172 } 00173 } 00174 00175 /** 00176 * Add the specified schedule item to the queue. 00177 * 00178 * @param item the item to schedule 00179 */ 00180 void schedule(ScheduleItem item) { 00181 //Debug.println(6, "schedule " + item.name); 00182 item.checkDate(new Date()); 00183 synchronized (active) { 00184 DListItem head = null; 00185 try { 00186 head = active.head(); 00187 } catch (ListException e) { 00188 } 00189 DListItem d = head; 00190 if (d == null || item.date.before(((ScheduleItem)d.obj).date)) { 00191 active.addFront(item); 00192 } else { 00193 while (d.next != head) { 00194 d = d.next; 00195 ScheduleItem s = (ScheduleItem)d.obj; 00196 if (item.date.before(s.date)) break; 00197 } 00198 active.addBefore(d, item); 00199 } 00200 active.notify(); 00201 } 00202 } 00203 00204 /** 00205 * Thread method, runs the main scheduler event loop: Get the next 00206 * task to execute (the active list is sorted by execution time) and 00207 * wait that long. If another thread puts a new task in the queue, 00208 * we'll get interrupted, but than's ok. Then, look at the head of the 00209 * active list again and, if the task is ready, execute it. 00210 */ 00211 public void run() { 00212 //Debug.println(2, "starting: " + getName()); 00213 while (!terminate) { 00214 ScheduleItem s = null; 00215 synchronized (active) { 00216 DListItem head = null; 00217 try { 00218 head = active.head(); 00219 } catch (ListException e) { 00220 } 00221 00222 try { 00223 if (head == null) { 00224 active.wait(); 00225 } else { 00226 long ms = msUntil(((ScheduleItem)head.obj).date); 00227 //Debug.println(6, "Next scheduled event in " + ms + " ms"); 00228 if (ms > 0) active.wait(ms); 00229 } 00230 } catch (InterruptedException e) { 00231 } 00232 00233 try { 00234 head = active.head(); 00235 s = (ScheduleItem)head.obj; 00236 if (s.ready()) { 00237 active.popFront(); 00238 } else { 00239 s = null; 00240 } 00241 } catch (ListException e) { 00242 s = null; 00243 } 00244 } 00245 00246 if (s != null) { 00247 try { 00248 if (stream != null) { 00249 //Debug.println(6, "Dispatching: " + s.name); 00250 if (s.runnable != null) { 00251 stream.write(s.runnable); 00252 } else { 00253 stream.write(s.command); 00254 } 00255 } 00256 else if (context != null) { 00257 //Debug.println(6, "Running: " + s.name); 00258 if (s.runnable != null) { 00259 if (!s.isRunning()) { 00260 new Thread(s.runnable).start(); 00261 } 00262 } else { 00263 s.command.execute(context); 00264 } 00265 //Debug.println(6, "Done: " + s.name); 00266 } 00267 } catch (Exception e) { 00268 Debug.print(e); 00269 } 00270 schedule(s); 00271 } 00272 } 00273 //Debug.println(2, "terminating " + getName()); 00274 } 00275 00276 /** 00277 * Handle a debug monitor command directed at this instance. 00278 */ 00279 public int handleInteractiveCommand(String cmd, Vector args) { 00280 if (cmd.equals("list")) { 00281 Enumeration enum = schedule.elements(); 00282 while (enum.hasMoreElements()) { 00283 ScheduleItem s = (ScheduleItem)enum.nextElement(); 00284 System.out.println("agent: " + s.name + ", interval = " + 00285 s.interval + " ms, next scheduled: " + 00286 s.date); 00287 } 00288 } else if (cmd.equals("queue")) { 00289 synchronized(active) { 00290 DListItem head = null; 00291 try { 00292 head = active.head(); 00293 } catch (ListException e) { 00294 } 00295 DListItem d = head; 00296 if (d != null) { 00297 do { 00298 ScheduleItem s = (ScheduleItem)d.obj; 00299 System.out.println("agent: " + s.name + 00300 ", interval = " + 00301 s.interval + 00302 " ms, next scheduled: " + 00303 s.date); 00304 d = d.next; 00305 } while (d != head); 00306 } 00307 } 00308 } else { 00309 System.out.println("usage: "); 00310 System.out.println(" list"); 00311 return -1; 00312 } 00313 return 0; 00314 } 00315 00316 public String toString() { 00317 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 00318 PrintWriter pos = new PrintWriter(bos); 00319 synchronized(active) { 00320 DListItem head = null; 00321 try { 00322 head = active.head(); 00323 } catch (ListException e) { 00324 } 00325 DListItem d = head; 00326 if (d != null) { 00327 do { 00328 ScheduleItem s = (ScheduleItem)d.obj; 00329 pos.println("agent: " + s.name); 00330 pos.println(" interval: " + s.interval + " ms"); 00331 pos.println(" next: " + s.date); 00332 pos.println(" running: " + s.isRunning()); 00333 d = d.next; 00334 } while (d != head); 00335 } 00336 } 00337 pos.flush(); 00338 return bos.toString(); 00339 } 00340 00341 /** 00342 * How many ms between now and then (d)? 00343 * 00344 * @param d then 00345 * @return then - now 00346 */ 00347 long msUntil(Date d) { 00348 long now = new Date().getTime(); 00349 long then = d.getTime(); 00350 return then - now; 00351 } 00352 00353 /** 00354 * Terminate this scheduler. 00355 */ 00356 public void terminate() { 00357 terminate = true; 00358 synchronized (active) { 00359 active.notify(); 00360 } 00361 } 00362 00363 } 00364 00365 /** 00366 * This class is private to PeriodicScheduler: it represents a single 00367 * scheduled item, and gets placed in the queue and the hashtable. 00368 */ 00369 class ScheduleItem implements Runnable { 00370 Command command = null; 00371 Runnable runnable = null; 00372 String name = null; 00373 Date date = new Date(); 00374 long interval = 5000; 00375 boolean running = false; 00376 00377 ScheduleItem(String name) { 00378 this.name = name; 00379 } 00380 00381 void checkDate(Date now) { 00382 if (interval < 0) throw new RuntimeException("negative interval"); 00383 //date.setTime(now.getTime() + interval); 00384 date.setTime(date.getTime() + interval); 00385 if (date.before(now)) 00386 date.setTime(now.getTime() + interval); 00387 } 00388 00389 boolean ready() { 00390 return date.before(new Date()); 00391 } 00392 00393 boolean isRunning() { 00394 return running; 00395 } 00396 00397 public void run() { 00398 running = true; 00399 try { 00400 runnable.run(); 00401 } catch (Throwable t) { 00402 Debug.print(t); 00403 } finally { 00404 running = false; 00405 } 00406 } 00407 }