1 package org.jumpmind.symmetric.util;
2
3 import java.io.FilterOutputStream;
4 import java.io.IOException;
5 import java.io.OutputStream;
6
7 /***
8 * Throttle output stream to write at a specified rate. the rate will be an
9 * average
10 *
11 * @author hwang
12 */
13 public class MeteredOutputStream extends FilterOutputStream {
14
15
16
17 private long maxBps;
18
19
20 private long bytesSent;
21
22
23 private long startTime;
24
25
26 private boolean started = false;
27
28
29 private long threshold = DEFFAULT_THRESHOLD;
30
31
32 private long checkPoint = DEFAULT_CHECK_POINT;
33
34
35 private static final long DEFFAULT_THRESHOLD = 8192L;
36
37
38 private static final long DEFAULT_CHECK_POINT = 1024L;
39
40 /***
41 * @param out
42 * stream written to
43 * @param maxBps
44 * max number of bytes per second
45 * @param threshold
46 * the number in bytes before the throttle output stream
47 */
48 public MeteredOutputStream(OutputStream out, long maxBps, long threshold) {
49 super(out);
50 this.maxBps = maxBps;
51 bytesSent = 0;
52 this.threshold = threshold <= DEFFAULT_THRESHOLD ? DEFFAULT_THRESHOLD : threshold;
53
54 checkPoint = DEFAULT_CHECK_POINT;
55 }
56
57 /***
58 * @param out
59 * out stream written to
60 * @param maxBps
61 * max number of bytes per second
62 * @param threshold
63 * the number in bytes before throttling output stream
64 * @param checkPoint
65 * check the average rate when total byts%checkPoint == 0.
66 * <br>
67 * the throttled output stream will write checkPoing number
68 * of bytes at full speed, and then sleep for a certain to
69 * obtain an average close to maxBps. If set it to 1, it will
70 * check every bytes written and the rate will be the most
71 * accurate.
72 */
73 public MeteredOutputStream(OutputStream out, long maxBps, long threshold, long checkPoint) {
74 super(out);
75 this.maxBps = maxBps;
76 bytesSent = 0;
77 this.threshold = threshold <= DEFFAULT_THRESHOLD ? DEFFAULT_THRESHOLD : threshold;
78 this.checkPoint = checkPoint;
79 }
80
81 /***
82 * @param out
83 * @param maxBps
84 */
85 public MeteredOutputStream(OutputStream out, long maxBps) {
86 super(out);
87 this.maxBps = maxBps;
88 bytesSent = 0;
89 this.threshold = DEFFAULT_THRESHOLD;
90 checkPoint = DEFAULT_CHECK_POINT;
91 }
92
93 @Override
94 public void write(int b) throws IOException {
95
96
97 bytesSent += 1;
98 if (!started) {
99 started = true;
100 startTime = System.currentTimeMillis();
101 }
102
103
104 if (bytesSent >= threshold && (bytesSent % checkPoint == 0)) {
105 long elapsed = System.currentTimeMillis() - startTime + 1;
106 long currentBps = bytesSent * 1000L / elapsed;
107 if (currentBps > maxBps) {
108
109 long expected = bytesSent * 1000L / maxBps;
110 try {
111 Thread.sleep(expected - elapsed);
112 } catch (InterruptedException e) {
113 }
114 }
115 }
116 out.write(b);
117
118 }
119 }