View Javadoc

1   package org.jumpmind.symmetric.util;
2   
3   import java.io.FilterOutputStream;
4   import java.io.IOException;
5   import java.io.OutputStream;
6   
7   /***
8    * Throttle output stream to write at a specified rate. the rate will be an
9    * average
10   * 
11   * @author hwang
12   */
13  public class MeteredOutputStream extends FilterOutputStream {
14  
15      // max allowed, this will be an average over time. for small packages, it
16      // will be consume the whole bandwidth
17      private long maxBps;
18  
19      // total bytes send through
20      private long bytesSent;
21  
22      // start time when stream started output
23      private long startTime;
24  
25      // flag to determine whether stream started output
26      private boolean started = false;
27  
28      // threshold before throttling in number of bytes
29      private long threshold = DEFFAULT_THRESHOLD;
30  
31      // frequency to recalculation rate in number of bytes
32      private long checkPoint = DEFAULT_CHECK_POINT;
33  
34      // default threshold before throttling in number of bytes
35      private static final long DEFFAULT_THRESHOLD = 8192L;
36  
37      // default frequency to recalculation rate in number of bytes
38      private static final long DEFAULT_CHECK_POINT = 1024L;
39  
40      /***
41       * @param out
42       *                stream written to
43       * @param maxBps
44       *                max number of bytes per second
45       * @param threshold
46       *                the number in bytes before the throttle output stream
47       */
48      public MeteredOutputStream(OutputStream out, long maxBps, long threshold) {
49          super(out);
50          this.maxBps = maxBps;
51          bytesSent = 0;
52          this.threshold = threshold <= DEFFAULT_THRESHOLD ? DEFFAULT_THRESHOLD : threshold;
53  
54          checkPoint = DEFAULT_CHECK_POINT;
55      }
56  
57      /***
58       * @param out
59       *                out stream written to
60       * @param maxBps
61       *                max number of bytes per second
62       * @param threshold
63       *                the number in bytes before throttling output stream
64       * @param checkPoint
65       *                check the average rate when total byts%checkPoint == 0.
66       *                <br>
67       *                the throttled output stream will write checkPoing number
68       *                of bytes at full speed, and then sleep for a certain to
69       *                obtain an average close to maxBps. If set it to 1, it will
70       *                check every bytes written and the rate will be the most
71       *                accurate.
72       */
73      public MeteredOutputStream(OutputStream out, long maxBps, long threshold, long checkPoint) {
74          super(out);
75          this.maxBps = maxBps;
76          bytesSent = 0;
77          this.threshold = threshold <= DEFFAULT_THRESHOLD ? DEFFAULT_THRESHOLD : threshold;
78          this.checkPoint = checkPoint;
79      }
80  
81      /***
82       * @param out
83       * @param maxBps
84       */
85      public MeteredOutputStream(OutputStream out, long maxBps) {
86          super(out);
87          this.maxBps = maxBps;
88          bytesSent = 0;
89          this.threshold = DEFFAULT_THRESHOLD;
90          checkPoint = DEFAULT_CHECK_POINT;
91      }
92  
93      @Override
94      public void write(int b) throws IOException {
95          // check if stream start output, if not, set started to true and set the
96          // start time
97          bytesSent += 1;
98          if (!started) {
99              started = true;
100             startTime = System.currentTimeMillis();
101         }
102         // only check when total bytes greater than limit and adjust at
103         // checkPoint
104         if (bytesSent >= threshold && (bytesSent % checkPoint == 0)) {
105             long elapsed = System.currentTimeMillis() - startTime + 1;
106             long currentBps = bytesSent * 1000L / elapsed;
107             if (currentBps > maxBps) {
108 
109                 long expected = bytesSent * 1000L / maxBps;
110                 try {
111                     Thread.sleep(expected - elapsed);
112                 } catch (InterruptedException e) {
113                 }
114             }
115         }
116         out.write(b);
117 
118     }
119 }