知识大全 用Java多线程实现无阻塞读取远程文件

Posted 文件

篇首语:不要在乎别人的眼光,这样我们会生不如死。本文由小常识网(cha138.com)小编为大家整理,主要介绍了知识大全 用Java多线程实现无阻塞读取远程文件相关的知识,希望对你有一定的参考价值。

用Java多线程实现无阻塞读取远程文件  以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!

  我是不怎么赞同使用Java多线程下载的 加之有的链接下载速度本身就比较快 所以在下载速度足够的情况下 就让下载线程退出 直到只剩下一个下载线程 当然 多线程中令人头痛的死锁问题 HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂

  简要介绍一下使用Java多线程实现无阻塞读取远程文件的方法 将缓冲区buf[]分为 块 每块 K 下载线程负责向缓冲区写数据 每次写一块 读线程(BuffRandAcceURL类)每次读小于 K的任意字节 同步描述 写/写互斥等待空闲块 写/写并发填写buf[] 读/写并发使用buf[]

  经过我很长一段时间使用 我认为比较满意地实现了我的目标 同其它MP 播放器对比 我的这种方法能够比较流畅 稳定地下载并播放 我把实现多线程下载缓冲的方法写出来 不足之处恳请批评指正

  一 HttpReader类功能 HTTP协议从指定URL读取数据

  /** *//**

  * author by

  */

  package instream;

  import java io IOException;

  import java io InputStream;

  import HttpURLConnection;

  import URL;

  public final class HttpReader

  public static final int MAX_RETRY = ;

  private static long content_length;

  private URL url;

  private HttpURLConnection Connection;

  private InputStream in_stream;

  private long cur_pos;           //用于决定seek方法中是否执行文件定位

  private int connect_timeout;

  private int read_timeout;

  public HttpReader(URL u)

  this(u );

  

  public HttpReader(URL u int connect_timeout int read_timeout)

  nnect_timeout = connect_timeout;

  this read_timeout = read_timeout;

  url = u;

  if (content_length == )

  int retry = ;

  while (retry < HttpReader MAX_RETRY)

  try

  this seek( );

  content_length = ();

  break;

   catch (Exception e)

  retry++;

  

  

  

  public static long getContentLength()

  return content_length;

  

  public int read(byte[] b int off int len) throws IOException

  int r = in_stream read(b off len);

  cur_pos += r;

  return r;

  

  public int getData(byte[] b int off int len) throws IOException

  int r rema = len;

  while (rema > )

  if ((r = in_stream read(b off rema)) == )

  return ;

  

  rema = r;

  off += r;

  cur_pos += r;

  

  return len;

  

  public void close()

  if (Connection != null)

  ();

  Connection = null;

  

  if (in_stream != null)

  try

  in_stream close();

   catch (IOException e)

  in_stream = null;

  

  url = null;

  

  /**//*

  * 抛出异常通知再试

  * 响应码 可能是由某种暂时的原因引起的 例如同一IP频繁的连接请求可能遭服务器拒绝

  */

  public void seek(long start_pos) throws IOException

  if (start_pos == cur_pos && in_stream != null)

  return;

  if (Connection != null)

  ();

  Connection = null;

  

  if (in_stream != null)

  in_stream close();

  in_stream = null;

  

  Connection = (HttpURLConnection) url openConnection();

  (connect_timeout);

  (read_timeout);

  String sProperty = bytes= + start_pos + ;

  ( Range sProperty);

  //( Connection Keep Alive );

  int responseCode = ();

  if (responseCode < || responseCode >= )

  try

  Thread sleep( );

   catch (InterruptedException e)

  e printStackTrace();

  

  throw new IOException( HTTP responseCode= +responseCode);

  

  in_stream = ();

  cur_pos = start_pos;

  

  

  二 IWriterCallBack接口功能 实现读/写通信

  package instream;

  public interface IWriterCallBack

  public boolean tryWriting(Writer w) throws InterruptedException;

  public void updateBuffer(int i int len);

  public void updateWriterCount();

  public void terminateWriters();

  

  三 Writer类 下载线程 负责向buf[]写数据

  /** *//**

  *

  */

  package instream;

  import java io IOException;

  import URL;

  public final class Writer implements Runnable

  private static boolean isalive = true;

  private byte[] buf;

  private IWriterCallBack icb;

  protected int index;            //buf[]内 块 索引号

  protected long start_pos;       //index对应的文件位置(相对于文件首的偏移量)

  protected int await_count;      //用于判断:下载速度足够就退出一个 写 线程

  private HttpReader hr;

  public Writer(IWriterCallBack call_back URL u byte[] b int i)

  hr = new HttpReader(u);

  if(HttpReader getContentLength() == )  //实例化HttpReader对象都不成功

  return;

  icb = call_back;

  buf = b;

  Thread t = new Thread(this dt_ +i);

  t setPriority(Thread NORM_PRIORITY + );

  t start();

  

  public void run()

  int write_bytes= write_pos= rema = retry = ;

  boolean cont = true;

  while (cont)

  try

  // 等待空闲块

  if(retry == )

  if (icb tryWriting(this) == false)

  break;

  write_bytes = ;

  rema = BuffRandAcceURL UNIT_LENGTH;

  write_pos = index << BuffRandAcceURL UNIT_LENGTH_BITS;

  

  // 定位

  hr seek(start_pos);

  // 下载 一块

  int w;

  while (rema > && isalive)

  w = (rema < ) ? rema : ; //每次读几K合适?

  if ((w = hr read(buf write_pos w)) == )

  cont = false;

  break;

  

  rema = w;

  write_pos += w;

  start_pos += w;

  write_bytes += w;

  

  // 通知 读 线程

  retry = ;

  icb updateBuffer(index write_bytes);

   catch (InterruptedException e)

  isalive = false;

  icb terminateWriters();

  break;

   catch (IOException e)

  if(++retry == HttpReader MAX_RETRY)

  isalive = false;

  icb terminateWriters();

  break;

  

  

  

  icb updateWriterCount();

  try

  hr close();

   catch (Exception e)

  hr = null;

  buf = null;

  icb = null;

  

  

  四 IRandomAccess接口

  无阻塞读取远程文件中需要随机读取文件接口 BuffRandAcceURL类和BuffRandAcceFile类实现接口方法 BuffRandAcceFile类实现读取本地磁盘文件 这儿就不给出其源码了

  package instream;

  public interface IRandomAccess

  public int read() throws Exception;

  public int read(byte b[]) throws Exception;

  public int read(byte b[] int off int len) throws Exception;

  public int dump(int src_off byte b[] int dst_off int len) throws Exception;

  public void seek(long pos) throws Exception;

  public long length();

  public long getFilePointer();

  public void close();

  

  五 BuffRandAcceURL类功能 创建下载线程 read方法从buf[]读数据

  关键是如何简单有效防止死锁?以下只是我的一次尝试 请指正

  /** *//**

  * ;

  */

  package instream;

  import URL;

  import URLDecoder;

  import decode Header;

  import tag MP Tag;

  import tag TagThread;

  public final class BuffRandAcceURL implements IRandomAccess IWriterCallBack

  public static final int UNIT_LENGTH_BITS = ;                  // K

  public static final int UNIT_LENGTH = << UNIT_LENGTH_BITS;

  public static final int BUF_LENGTH = UNIT_LENGTH << ;            // 块

  public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;

  public static final int BUF_LENGTH_MASK = (BUF_LENGTH );

  private static final int MAX_WRITER = ;

  private static long file_pointer;

  private static int read_pos;

  private static int fill_bytes;

  private static byte[] buf;      //同时也作读写同步锁:buf wait()/buf notify()

  private static int[] buf_bytes;

  private static int buf_index;

  private static int alloc_pos;

  private static URL url = null;

  private static boolean isalive = true;

  private static int writer_count;

  private static int await_count;

  private long file_length;

  private long frame_bytes;

  public BuffRandAcceURL(String sURL) throws Exception

  this(sURL MAX_WRITER);

  

  public BuffRandAcceURL(String sURL int download_threads) throws Exception

  buf = new byte[BUF_LENGTH];

  buf_bytes = new int[UNIT_COUNT];

  url = new URL(sURL);

  //创建线程以异步方式解析ID

  new TagThread(url);

  //打印当前文件名

  try

  String s = URLDecoder decode(sURL GBK );

  System out println( start>> + s substring(s lastIndexOf( / ) + ));

  s = null;

   catch (Exception e)

  System out println( start>> + sURL);

  

  //创建 写 线程

  for(int i = ; i < download_threads; i++)

  new Writer(this url buf i+ );

  frame_bytes = file_length = HttpReader getContentLength();

  if(file_length == )

  Header strLastErr = 连接URL出错 重试 + HttpReader MAX_RETRY + 次后放弃 ;

  throw new Exception( retry + HttpReader MAX_RETRY);

  

  writer_count = download_threads;

  //缓冲

  try_cache();

  //跳过ID v

  MP Tag mP Tag = new MP Tag();

  int v _size = mP Tag checkID V (buf );

  if (v _size > )

  frame_bytes = v _size;

  //seek(v _size):

  fill_bytes = v _size;

  file_pointer = v _size;

  read_pos = v _size;

  read_pos &= BUF_LENGTH_MASK;

  int units = v _size >> UNIT_LENGTH_BITS;

  for(int i = ; i < units; i++)

  buf_bytes[i] = ;

  this notifyWriter();

  

  buf_bytes[units] = v _size;

  this notifyWriter();

  

  mP Tag = null;

  

  private void try_cache() throws InterruptedException

  int cache_size = BUF_LENGTH;

  if(cache_size > (int)file_length alloc_pos)

  cache_size = (int)file_length alloc_pos;

  cache_size = UNIT_LENGTH;

  //等待填写当前正在读的那 一块 缓冲区

  /**//*if(fill_bytes >= cache_size && writer_count > )

  synchronized (buf)

  buf wait();

  

  return;

  */

  //等待填满缓冲区

  while (fill_bytes < cache_size)

  if (writer_count == || isalive == false)

  return;

  if(BUF_LENGTH > (int)file_length alloc_pos)

  cache_size = (int)file_length alloc_pos UNIT_LENGTH;

  System out printf( \\r[缓冲% $ f%%] (float)fill_bytes / cache_size * );

  synchronized (buf)

  buf wait();

  

  

  System out printf( \\r );

  

  private int try_reading(int i int len) throws Exception

  int n = (i == UNIT_COUNT ) ? : (i + );

  int r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);

  while (r < len)

  if (writer_count == || isalive == false)

  return r;

  try_cache();

  r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);

  

  return len;

  

  /**//*

  * 各个 写 线程互斥等待空闲块

  */

  public synchronized boolean tryWriting(Writer w) throws InterruptedException

  await_count++;

  while (buf_bytes[buf_index] != && isalive)

  this wait();

  

  //下载速度足够就结束一个 写 线程

  if(writer_count > && w await_count >= await_count &&

  w await_count >= writer_count)

  return false;

  if(alloc_pos >= file_length)

  return false;

  w await_count = await_count;

  await_count ;

  w start_pos = alloc_pos;

  w index = buf_index;

  alloc_pos += UNIT_LENGTH;

  buf_index = (buf_index == UNIT_COUNT ) ? : buf_index + ;

  return isalive;

  

  public void updateBuffer(int i int len)

  synchronized (buf)

  buf_bytes[i] = len;

  fill_bytes += len;

  buf notify();

  

  

  public void updateWriterCount()

  synchronized (buf)

  writer_count ;

  buf notify();

  

  

  public synchronized void notifyWriter()

  this notifyAll();

  

  public void terminateWriters()

  synchronized (buf)

  if (isalive)

  isalive = false;

  Header strLastErr = 读取文件超时 重试 + HttpReader MAX_RETRY

  + 次后放弃 请您稍后再试 ;

  

  buf notify();

  

  notifyWriter();

  

  public int read() throws Exception

  int iret = ;

  int i = read_pos >> UNIT_LENGTH_BITS;

  // 等待 有 字节可读

  while (buf_bytes[i] < )

  try_cache();

  if (writer_count == )

  return ;

  

  if(isalive == false)

  return ;

  // 读取

  iret = buf[read_pos] & xff;

  fill_bytes ;

  file_pointer++;

  read_pos++;

  read_pos &= BUF_LENGTH_MASK;

  if ( buf_bytes[i] == )

  notifyWriter();     // 通知

  return iret;

  

  public int read(byte b[]) throws Exception

  return read(b b length);

  

  public int read(byte[] b int off int len) throws Exception

  if(len > UNIT_LENGTH)

  len = UNIT_LENGTH;

  int i = read_pos >> UNIT_LENGTH_BITS;

  // 等待 有足够内容可读

  if(try_reading(i len) < len || isalive == false)

  return ;

  // 读取

  int tail_len = BUF_LENGTH read_pos; // write_pos != BUF_LENGTH

  if (tail_len < len)

  System arraycopy(buf read_pos b off tail_len);

  System arraycopy(buf b off + tail_len len tail_len);

   else

  System arraycopy(buf read_pos b off len);

  fill_bytes = len;

  file_pointer += len;

  read_pos += len;

  read_pos &= BUF_LENGTH_MASK;

  buf_bytes[i] = len;

  if (buf_bytes[i] < )

  int ni = read_pos >> UNIT_LENGTH_BITS;

  buf_bytes[ni] += buf_bytes[i];

  buf_bytes[i] = ;

  notifyWriter();

   else if (buf_bytes[i] == )

  notifyWriter();

  return len;

  

  /**//*

  * 从src_off位置复制 不移动文件 指针

  */

  public int dump(int src_off byte b[] int dst_off int len) throws Exception

  int rpos = read_pos + src_off;

  if(try_reading(rpos >> UNIT_LENGTH_BITS len) < len || isalive == false)

  return ;

  int tail_len = BUF_LENGTH rpos;

  if (tail_len < len)

  System arraycopy(buf rpos b dst_off tail_len);

  System arraycopy(buf b dst_off + tail_len len tail_len);

   else

  System arraycopy(buf rpos b dst_off len);

  // 不发信号

  return len;

  

  public long length()

  return file_length;

  

  public long getFilePointer()

  return file_pointer;

  

  public void close()

  //

  

  //

  public void seek(long pos) throws Exception

  //

  

cha138/Article/program/Java/gj/201311/27619

相关参考

知识大全 Java多线程如何防止主线的阻塞

Java多线程如何防止主线的阻塞  以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!  Java多线程在

知识大全 Java多线程下载

Java多线程下载  以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!  同时下载多个文件不过单文件没有

知识大全 用java线程实现批量修改文件名

   /**      *thisprogramTODO    &nb

知识大全 java阻塞队列 线程同步合作

  Queue接口与ListSet同一级别都是继承了Collection接口LinkedList实现了Queue接口Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果

知识大全 java读取文本文件代码

java读取文本文件的方法有很多这个例子主要介绍最简单最常用的BufferedReader类    完整例子如下    packagenetchinaunixbloghzmtext;    impor

知识大全 Java相对路径读取文件

Java相对路径读取文件  以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!  不管你是新手还是老鸟在程

知识大全 JAVA如何实现从最后一行读取文件

JAVA如何实现从最后一行读取文件  以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!  JAVA如何实

知识大全 java 读取properties 文件

  使用javautilProperties类的load()方法  示例  InputStreamin=newBufferedInputStream(newFileInputStream(name))

知识大全 java读取文件夹下的所有文件夹和文件

  以下是实现的代码cha138/Article/program/Java/hx/201311/26965

知识大全 JAVA读取xml文件中节点值

JAVA读取xml文件中节点值  以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!  importwcd