/** *//*** http://www.5a520.cn http://www.bt285.cn*/ package instream; import java.net.url; import java.net.urldecoder; import decode.header; import tag.mp3tag; import tag.tagthread; public final class buffrandacceurl implements irandomaccess, iwritercallback { public static final int unit_length_bits = 15; //32k public static final int unit_length = 1 << unit_length_bits; public static final int buf_length = unit_length << 4; //16块 public static final int unit_count = buf_length >> unit_length_bits; public static final int buf_length_mask = (buf_length - 1); private static final int max_writer = 8; private static long file_pointer; private static int read_pos; private static int fill_bytes; private static byte[] buf; //同时也作读写同步锁:buf.wait()/buf.notify() private static int[] buf_bytes; private static int buf_index; private static int alloc_pos; private static url url = null; private static boolean isalive = true; private static int writer_count; private static int await_count; private long file_length; private long frame_bytes; public buffrandacceurl(string surl) throws exception { this(surl,max_writer); } public buffrandacceurl(string surl, int download_threads) throws exception { buf = new byte[buf_length]; buf_bytes = new int[unit_count]; url = new url(surl); //创建线程以异步方式解析id3 new tagthread(url); //打印当前文件名 try { string s = urldecoder.decode(surl, "gbk"); system.out.println("start>> " + s.substring(s.lastindexof("/") + 1)); s = null; } catch (exception e) { system.out.println("start>> " + surl); } //创建"写"线程 for(int i = 0; i < download_threads; i++) new writer(this, url, buf, i+1); frame_bytes = file_length = httpreader.getcontentlength(); if(file_length == 0) { header.strlasterr = "连接url出错,重试 " + httpreader.max_retry + " 次后放弃。"; throw new exception("retry " + httpreader.max_retry); } writer_count = download_threads; //缓冲 try_cache(); //跳过id3 v2 mp3tag mp3tag = new mp3tag(); int v2_size = mp3tag.checkid3v2(buf,0); if (v2_size > 0) { frame_bytes -= v2_size; //seek(v2_size): fill_bytes -= v2_size; file_pointer = v2_size; read_pos = v2_size; read_pos &= buf_length_mask; int units = v2_size >> unit_length_bits; for(int i = 0; i < units; i++) { buf_bytes[i] = 0; this.notifywriter(); } buf_bytes[units] -= v2_size; this.notifywriter(); } mp3tag = null; } private void try_cache() throws interruptedexception { int cache_size = buf_length; if(cache_size > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos; cache_size -= unit_length; //等待填写当前正在读的那"一块"缓冲区 /**//*if(fill_bytes >= cache_size && writer_count > 0) { synchronized (buf) { buf.wait(); } return; }*/ //等待填满缓冲区 while (fill_bytes < cache_size) { if (writer_count == 0 || isalive == false) return; if(buf_length > (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos - unit_length; system.out.printf("/r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100); synchronized (buf) { buf.wait(); } } system.out.printf("/r"); } private int try_reading(int i, int len) throws exception { int n = (i == unit_count - 1) ? 0 : (i + 1); int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); while (r < len) { if (writer_count == 0 || isalive == false) return r; try_cache(); r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]); } return len; } /**//* * 各个"写"线程互斥等待空闲块 */ public synchronized boolean trywriting(writer w) throws interruptedexception { await_count++; while (buf_bytes[buf_index] != 0 && isalive) { this.wait(); } //下载速度足够就结束一个"写"线程 if(writer_count > 1 && w.await_count >= await_count && w.await_count >= writer_count) return false; if(alloc_pos >= file_length) return false; w.await_count = await_count; await_count--; w.start_pos = alloc_pos; w.index = buf_index; alloc_pos += unit_length; buf_index = (buf_index == unit_count - 1) ? 0 : buf_index + 1; return isalive; } public void updatebuffer(int i, int len) { synchronized (buf) { buf_bytes[i] = len; fill_bytes += len; buf.notify(); } } public void updatewritercount() { synchronized (buf) { writer_count--; buf.notify(); } } public synchronized void notifywriter() { this.notifyall(); } public void terminatewriters() { synchronized (buf) { if (isalive) { isalive = false; header.strlasterr = "读取文件超时。重试 " + httpreader.max_retry + " 次后放弃,请您稍后再试。"; } buf.notify(); } notifywriter(); } public int read() throws exception { int iret = -1; int i = read_pos >> unit_length_bits; // 1."等待"有1字节可读 while (buf_bytes[i] < 1) { try_cache(); if (writer_count == 0) return -1; } if(isalive == false) return -1; // 2.读取 iret = buf[read_pos] & 0xff; fill_bytes--; file_pointer++; read_pos++; read_pos &= buf_length_mask; if (--buf_bytes[i] == 0) notifywriter(); // 3.通知 return iret; } public int read(byte b[]) throws exception { return read(b, 0, b.length); } public int read(byte[] b, int off, int len) throws exception { if(len > unit_length) len = unit_length; int i = read_pos >> unit_length_bits; // 1."等待"有足够内容可读 if(try_reading(i, len) < len || isalive == false) return -1; // 2.读取 int tail_len = buf_length - read_pos; // write_pos != buf_length if (tail_len < len) { system.arraycopy(buf, read_pos, b, off, tail_len); system.arraycopy(buf, 0, b, off + tail_len, len - tail_len); } else system.arraycopy(buf, read_pos, b, off, len); fill_bytes -= len; file_pointer += len; read_pos += len; read_pos &= buf_length_mask; buf_bytes[i] -= len; if (buf_bytes[i] < 0) { int ni = read_pos >> unit_length_bits; buf_bytes[ni] += buf_bytes[i]; buf_bytes[i] = 0; notifywriter(); } else if (buf_bytes[i] == 0) notifywriter(); return len; } /**//* * 从src_off位置复制,不移动文件"指针" */ public int dump(int src_off, byte b[], int dst_off, int len) throws exception { int rpos = read_pos + src_off; if(try_reading(rpos >> unit_length_bits, len) < len || isalive == false) return -1; int tail_len = buf_length - rpos; if (tail_len < len) { system.arraycopy(buf, rpos, b, dst_off, tail_len); system.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len); } else system.arraycopy(buf, rpos, b, dst_off, len); // 不发信号 return len; } public long length() { return file_length; } public long getfilepointer() { return file_pointer; } public void close() { // } // public void seek(long pos) throws exception { // } } |