简要介绍实现多线程环形缓冲的方法

天地人dC

天地人dC

2016-02-20 01:02

今天图老师小编给大家展示的是简要介绍实现多线程环形缓冲的方法,精心挑选的内容希望大家多多支持、多多分享,喜欢就赶紧get哦!

 

我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类)每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填写buf[];读/写并发使用buf[]。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下载缓冲的方法写出来,不足之处恳请批评指正。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

一、HttpReader类功能:HTTP协议从指定URL读取数据

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

/** *//*** author by http://www.bt285.cn http://www.5a520.cn*/package instream; import java.io.IOException;   import java.io.InputStream;   import java.net.HttpURLConnection;   import java.net.URL; public final class HttpReader {   public static final int MAX_RETRY = 10;   private static long content_length;   private URL url;   private HttpURLConnection httpConnection;   private InputStream in_stream;   private long cur_pos;   //用于决定seek方法中是否执行文件定位   private int connect_timeout;   private int read_timeout;public HttpReader(URL u) {   this(u, 5000, 5000);   }public HttpReader(URL u, int connect_timeout, int read_timeout) {   this.connect_timeout = connect_timeout;   this.read_timeout = read_timeout;   url = u;   if (content_length == 0) { int retry = 0; while (retry  HttpReader.MAX_RETRY) try { this.seek(0); content_length = httpConnection.getContentLength(); break; } catch (Exception e) { retry++; }   }   }public static long getContentLength() {   return content_length;   }public int read(byte[] b, int off, int len) throws IOException {   int r = in_stream.read(b, off, len);   cur_pos += r;   return r;   }public int getData(byte[] b, int off, int len) throws IOException {   int r, rema = len;   while (rema  0) { if ((r = in_stream.read(b, off, rema)) == -1) { return -1; } rema -= r; off += r; cur_pos += r;   }   return len;   }public void close() {   if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null;   }   if (in_stream != null) { try { in_stream.close(); } catch (IOException e) {} in_stream = null;   }   url = null;   }/**//*   * 抛出异常通知再试   * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝   */  public void seek(long start_pos) throws IOException {   if (start_pos == cur_pos && in_stream != null) return;   if (httpConnection != null) { httpConnection.disconnect(); httpConnection = null;   }   if (in_stream != null) { in_stream.close(); in_stream = null;   }   httpConnection = (HttpURLConnection) url.openConnection();   httpConnection.setConnectTimeout(connect_timeout);   httpConnection.setReadTimeout(read_timeout);   String sProperty = "bytes=" + start_pos + "-";   httpConnection.setRequestProperty("Range", sProperty);   //httpConnection.setRequestProperty("Connection", "Keep-Alive");   int responseCode = httpConnection.getResponseCode();   if (responseCode  200 || responseCode = 300) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } throw new IOException("HTTP responseCode="+responseCode);   } in_stream = httpConnection.getInputStream();   cur_pos = start_pos;   } }

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

二、IWriterCallBack接口功能:实现读/写通信。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

package instream; public interface IWriterCallBack {   public boolean tryWriting(Writer w) throws InterruptedException;   public void updateBuffer(int i, int len);   public void updateWriterCount();   public void terminateWriters();   }

 

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

三、Writer类:下载线程,负责向buf[]写数据。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

/** *//*** http://www.bt285.cn http://www.5a520.cn */package instream;   import java.io.IOException;   import java.net.URL; public final class Writer implements Runnable {   private static boolean isalive = true;   private byte[] buf;   private IWriterCallBack icb;   protected int index;//buf[]内"块"索引号   protected long start_pos;   //index对应的文件位置(相对于文件首的偏移量)   protected int await_count;  //用于判断:下载速度足够就退出一个"写"线程   private HttpReader hr;public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {   hr = new HttpReader(u);   if(HttpReader.getContentLength() == 0)  //实例化HttpReader对象都不成功 return;   icb = call_back;   buf = b;   Thread t = new Thread(this,"dt_"+i);   t.setPriority(Thread.NORM_PRIORITY + 1);   t.start();   }public void run() {   int write_bytes=0, write_pos=0, rema = 0, retry = 0;   boolean cont = true;   while (cont) { try { // 1.等待空闲块 if(retry == 0) { if (icb.tryWriting(this) == false)break; write_bytes = 0; rema = BuffRandAcceURL.UNIT_LENGTH; write_pos = index  BuffRandAcceURL.UNIT_LENGTH_BITS; }   // 2.定位 hr.seek(start_pos);   // 3.下载"一块" int w; while (rema  0 && isalive) { w = (rema  2048) ? rema : 2048; //每次读几K合适? if ((w = hr.read(buf, write_pos, w)) == -1) {cont = false;break; } rema -= w; write_pos += w; start_pos += w; write_bytes += w; }   //4.通知"读"线程 retry = 0; icb.updateBuffer(index, write_bytes); } catch (InterruptedException e) { isalive = false; icb.terminateWriters(); break; } catch (IOException e) { if(++retry == HttpReader.MAX_RETRY) { isalive = false; icb.terminateWriters(); break; } }   }   icb.updateWriterCount();   try { hr.close();   } catch (Exception e) {}   hr = null;   buf = null;   icb = null;   } }

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

四、IRandomAccess接口:

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

随机读取文件接口,BuffRandAcceURL类和BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件,这儿就不给出其源码了。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

package instream; public interface IRandomAccess {   public int read() throws Exception;   public int read(byte b[]) throws Exception;   public int read(byte b[], int off, int len) throws Exception;   public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;   public void seek(long pos) throws Exception;   public long length();   public long getFilePointer();   public void close();   }

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。

 

(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)(本文来源于图老师网站,更多请访问https://m.tulaoshi.com/webkaifa/)

/** *//*** http://www.5a520.cn  http://www.bt285.cn*/ package instream; import java.net.URL;   import java.net.URLDecoder;   import decode.Header;   import tag.MP3Tag;   import tag.TagThread; public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {   public static final int UNIT_LENGTH_BITS = 15;//32K   public static final int UNIT_LENGTH = 1  UNIT_LENGTH_BITS;   public static final int BUF_LENGTH = UNIT_LENGTH  4;//16块   public static final int UNIT_COUNT = BUF_LENGTH  UNIT_LENGTH_BITS;   public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);   private static final int MAX_WRITER = 8;   private static long file_pointer;   private static int read_pos;   private static int fill_bytes;   private static byte[] buf;  //同时也作读写同步锁:buf.wait()/buf.notify()   private static int[] buf_bytes;   private static int buf_index;   private static int alloc_pos;   private static URL url = null;   private static boolean isalive = true;   private static int writer_count;   private static int await_count;   private long file_length;   private long frame_bytes;public BuffRandAcceURL(String sURL) throws Exception {   this(sURL,MAX_WRITER);   }public BuffRandAcceURL(String sURL, int download_threads) throws Exception {   buf = new byte[BUF_LENGTH];   buf_bytes = new int[UNIT_COUNT];   url = new URL(sURL);//创建线程以异步方式解析ID3   new TagThread(url);//打印当前文件名   try { String s = URLDecoder.decode(sURL, "GBK"); System.out.println("start " + s.substring(s.lastIndexOf("/") + 1)); s = null;   } catch (Exception e) { System.out.println("start " + sURL);   }//创建"写"线程   for(int i = 0; i  download_threads; i++) new Writer(this, url, buf, i+1);   frame_bytes = file_length = HttpReader.getContentLength();   if(file_length == 0) { Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。"; throw new Exception("retry " + HttpReader.MAX_RETRY);   }   writer_count = download_threads;//缓冲   try_cache();//跳过ID3 v2   MP3Tag mP3Tag = new MP3Tag();   int v2_size = mP3Tag.checkID3V2(buf,0);   if (v2_size  0) { frame_bytes -= v2_size; //seek(v2_size): fill_bytes -= v2_size; file_pointer = v2_size; read_pos = v2_size; read_pos &= BUF_LENGTH_MASK; int units = v2_size  UNIT_LENGTH_BITS; for(int i = 0; i  units; i++) { buf_bytes[i] = 0; this.notifyWriter(); } buf_bytes[units] -= v2_size; this.notifyWriter();   }   mP3Tag = null;   }private void try_cache() throws InterruptedException {   int cache_size = BUF_LENGTH;   if(cache_size  (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos;   cache_size -= UNIT_LENGTH;//等待填写当前正在读的那"一块"缓冲区   /**//*if(fill_bytes = cache_size && writer_count  0) {synchronized (buf) {buf.wait();}return;  }*/   //等待填满缓冲区   while (fill_bytes  cache_size) { if (writer_count == 0 || isalive == false) return; if(BUF_LENGTH  (int)file_length - alloc_pos) cache_size = (int)file_length - alloc_pos - UNIT_LENGTH; System.out.printf("r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100); synchronized (buf) { buf.wait(); }   }   System.out.printf("r");   }private int try_reading(int i, int len) throws Exception {   int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);   int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   while (r  len) { if (writer_count == 0 || isalive == false) return r; try_cache(); r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);   }return len;   }/**//*   * 各个"写"线程互斥等待空闲块   */  public synchronized boolean tryWriting(Writer w) throws InterruptedException {   await_count++;   while (buf_bytes[buf_index] != 0 && isalive) { this.wait();   }//下载速度足够就结束一个"写"线程   if(writer_count  1 && w.await_count = await_count && w.await_count = writer_count) return false;if(alloc_pos = file_length) return false;   w.await_count = await_count;   await_count--;   w.start_pos = alloc_pos;   w.index = buf_index;   alloc_pos += UNIT_LENGTH;   buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;   return isalive;   }public void updateBuffer(int i, int len) {   synchronized (buf) { buf_bytes[i] = len; fill_bytes += len; buf.notify();   }   }public void updateWriterCount() {   synchronized (buf) { writer_count--; buf.notify();   }   }public synchronized void notifyWriter() {   this.notifyAll();   }public void terminateWriters() {   synchronized (buf) { if (isalive) { isalive = false; Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY+ " 次后放弃,请您稍后再试。"; } buf.notify();   }notifyWriter();}public int read() throws Exception {   int iret = -1;   int i = read_pos  UNIT_LENGTH_BITS;   // 1."等待"有1字节可读   while (buf_bytes[i]  1) { try_cache(); if (writer_count == 0) return -1;   }   if(isalive == false) return -1; // 2.读取   iret = buf[read_pos] & 0xff;   fill_bytes--;   file_pointer++;   read_pos++;   read_pos &= BUF_LENGTH_MASK;   if (--buf_bytes[i] == 0) notifyWriter(); // 3.通知 return iret;   }public int read(byte b[]) throws Exception {   return read(b, 0, b.length);   } public int read(byte[] b, int off, int len) throws Exception {   if(len  UNIT_LENGTH) len = UNIT_LENGTH;   int i = read_pos  UNIT_LENGTH_BITS;// 1."等待"有足够内容可读   if(try_reading(i, len)  len || isalive == false) return -1; // 2.读取   int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH   if (tail_len  len) { System.arraycopy(buf, read_pos, b, off, tail_len); System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);   } elseSystem.arraycopy(buf, read_pos, b, off, len); fill_bytes -= len;   file_pointer += len;   read_pos += len;   read_pos &= BUF_LENGTH_MASK;   buf_bytes[i] -= len;   if (buf_bytes[i]  0) { int ni = read_pos  UNIT_LENGTH_BITS; buf_bytes[ni] += buf_bytes[i]; buf_bytes[i] = 0; notifyWriter();   } else if (buf_bytes[i] == 0) notifyWriter();return len;   }/**//*   * 从src_off位置复制,不移动文件"指针"   */  public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {   int rpos = read_pos + src_off;   if(try_reading(rpos  UNIT_LENGTH_BITS, len)  len || isalive == false) return -1;   int tail_len = BUF_LENGTH - rpos;   if (tail_len  len) { System.arraycopy(buf, rpos, b, dst_off, tail_len); System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);   } elseSystem.arraycopy(buf, rpos, b, dst_off, len);   // 不发信号 return len;   }public long length() {   return file_length;   }public long getFilePointer() {   return file_pointer;   } public void close() {   //   }//   public void seek(long pos) throws Exception {   //   }  }

展开更多 50%)
分享

猜你喜欢

简要介绍实现多线程环形缓冲的方法

Web开发
简要介绍实现多线程环形缓冲的方法

JavaScript多线程的实现方法

Web开发
JavaScript多线程的实现方法

s8lol主宰符文怎么配

英雄联盟 网络游戏
s8lol主宰符文怎么配

IOS多线程编程的3种实现方法

编程语言 网络编程
IOS多线程编程的3种实现方法

Java多线程编程精要之实现线程

Java JAVA基础
Java多线程编程精要之实现线程

lol偷钱流符文搭配推荐

英雄联盟 网络游戏
lol偷钱流符文搭配推荐

浅析php中实现多线程

PHP
浅析php中实现多线程

在IOS中为什么使用多线程及多线程实现的三种方法

编程语言 网络编程
在IOS中为什么使用多线程及多线程实现的三种方法

lolAD刺客新符文搭配推荐

英雄联盟
lolAD刺客新符文搭配推荐

excel自动醒目的小计

excel自动醒目的小计

Excel金额小写转大写公式

Excel金额小写转大写公式
下拉加载更多内容 ↓