知识大全 用Java多线程实现无阻塞读取远程文件
Posted 文件
篇首语:不要在乎别人的眼光,这样我们会生不如死。本文由小常识网(cha138.com)小编为大家整理,主要介绍了知识大全 用Java多线程实现无阻塞读取远程文件相关的知识,希望对你有一定的参考价值。
用Java多线程实现无阻塞读取远程文件 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!
我是不怎么赞同使用Java多线程下载的 加之有的链接下载速度本身就比较快 所以在下载速度足够的情况下 就让下载线程退出 直到只剩下一个下载线程 当然 多线程中令人头痛的死锁问题 HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂
简要介绍一下使用Java多线程实现无阻塞读取远程文件的方法 将缓冲区buf[]分为 块 每块 K 下载线程负责向缓冲区写数据 每次写一块 读线程(BuffRandAcceURL类)每次读小于 K的任意字节 同步描述 写/写互斥等待空闲块 写/写并发填写buf[] 读/写并发使用buf[]
经过我很长一段时间使用 我认为比较满意地实现了我的目标 同其它MP 播放器对比 我的这种方法能够比较流畅 稳定地下载并播放 我把实现多线程下载缓冲的方法写出来 不足之处恳请批评指正
一 HttpReader类功能 HTTP协议从指定URL读取数据
/** *//**
* author by
*/
package instream;
import java io IOException;
import java io InputStream;
import HttpURLConnection;
import URL;
public final class HttpReader
public static final int MAX_RETRY = ;
private static long content_length;
private URL url;
private HttpURLConnection Connection;
private InputStream in_stream;
private long cur_pos; //用于决定seek方法中是否执行文件定位
private int connect_timeout;
private int read_timeout;
public HttpReader(URL u)
this(u );
public HttpReader(URL u int connect_timeout int read_timeout)
nnect_timeout = connect_timeout;
this read_timeout = read_timeout;
url = u;
if (content_length == )
int retry = ;
while (retry < HttpReader MAX_RETRY)
try
this seek( );
content_length = ();
break;
catch (Exception e)
retry++;
public static long getContentLength()
return content_length;
public int read(byte[] b int off int len) throws IOException
int r = in_stream read(b off len);
cur_pos += r;
return r;
public int getData(byte[] b int off int len) throws IOException
int r rema = len;
while (rema > )
if ((r = in_stream read(b off rema)) == )
return ;
rema = r;
off += r;
cur_pos += r;
return len;
public void close()
if (Connection != null)
();
Connection = null;
if (in_stream != null)
try
in_stream close();
catch (IOException e)
in_stream = null;
url = null;
/**//*
* 抛出异常通知再试
* 响应码 可能是由某种暂时的原因引起的 例如同一IP频繁的连接请求可能遭服务器拒绝
*/
public void seek(long start_pos) throws IOException
if (start_pos == cur_pos && in_stream != null)
return;
if (Connection != null)
();
Connection = null;
if (in_stream != null)
in_stream close();
in_stream = null;
Connection = (HttpURLConnection) url openConnection();
(connect_timeout);
(read_timeout);
String sProperty = bytes= + start_pos + ;
( Range sProperty);
//( Connection Keep Alive );
int responseCode = ();
if (responseCode < || responseCode >= )
try
Thread sleep( );
catch (InterruptedException e)
e printStackTrace();
throw new IOException( HTTP responseCode= +responseCode);
in_stream = ();
cur_pos = start_pos;
二 IWriterCallBack接口功能 实现读/写通信
package instream;
public interface IWriterCallBack
public boolean tryWriting(Writer w) throws InterruptedException;
public void updateBuffer(int i int len);
public void updateWriterCount();
public void terminateWriters();
三 Writer类 下载线程 负责向buf[]写数据
/** *//**
*
*/
package instream;
import java io IOException;
import URL;
public final class Writer implements Runnable
private static boolean isalive = true;
private byte[] buf;
private IWriterCallBack icb;
protected int index; //buf[]内 块 索引号
protected long start_pos; //index对应的文件位置(相对于文件首的偏移量)
protected int await_count; //用于判断:下载速度足够就退出一个 写 线程
private HttpReader hr;
public Writer(IWriterCallBack call_back URL u byte[] b int i)
hr = new HttpReader(u);
if(HttpReader getContentLength() == ) //实例化HttpReader对象都不成功
return;
icb = call_back;
buf = b;
Thread t = new Thread(this dt_ +i);
t setPriority(Thread NORM_PRIORITY + );
t start();
public void run()
int write_bytes= write_pos= rema = retry = ;
boolean cont = true;
while (cont)
try
// 等待空闲块
if(retry == )
if (icb tryWriting(this) == false)
break;
write_bytes = ;
rema = BuffRandAcceURL UNIT_LENGTH;
write_pos = index << BuffRandAcceURL UNIT_LENGTH_BITS;
// 定位
hr seek(start_pos);
// 下载 一块
int w;
while (rema > && isalive)
w = (rema < ) ? rema : ; //每次读几K合适?
if ((w = hr read(buf write_pos w)) == )
cont = false;
break;
rema = w;
write_pos += w;
start_pos += w;
write_bytes += w;
// 通知 读 线程
retry = ;
icb updateBuffer(index write_bytes);
catch (InterruptedException e)
isalive = false;
icb terminateWriters();
break;
catch (IOException e)
if(++retry == HttpReader MAX_RETRY)
isalive = false;
icb terminateWriters();
break;
icb updateWriterCount();
try
hr close();
catch (Exception e)
hr = null;
buf = null;
icb = null;
四 IRandomAccess接口
无阻塞读取远程文件中需要随机读取文件接口 BuffRandAcceURL类和BuffRandAcceFile类实现接口方法 BuffRandAcceFile类实现读取本地磁盘文件 这儿就不给出其源码了
package instream;
public interface IRandomAccess
public int read() throws Exception;
public int read(byte b[]) throws Exception;
public int read(byte b[] int off int len) throws Exception;
public int dump(int src_off byte b[] int dst_off int len) throws Exception;
public void seek(long pos) throws Exception;
public long length();
public long getFilePointer();
public void close();
五 BuffRandAcceURL类功能 创建下载线程 read方法从buf[]读数据
关键是如何简单有效防止死锁?以下只是我的一次尝试 请指正
/** *//**
* ;
*/
package instream;
import URL;
import URLDecoder;
import decode Header;
import tag MP Tag;
import tag TagThread;
public final class BuffRandAcceURL implements IRandomAccess IWriterCallBack
public static final int UNIT_LENGTH_BITS = ; // K
public static final int UNIT_LENGTH = << UNIT_LENGTH_BITS;
public static final int BUF_LENGTH = UNIT_LENGTH << ; // 块
public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;
public static final int BUF_LENGTH_MASK = (BUF_LENGTH );
private static final int MAX_WRITER = ;
private static long file_pointer;
private static int read_pos;
private static int fill_bytes;
private static byte[] buf; //同时也作读写同步锁:buf wait()/buf notify()
private static int[] buf_bytes;
private static int buf_index;
private static int alloc_pos;
private static URL url = null;
private static boolean isalive = true;
private static int writer_count;
private static int await_count;
private long file_length;
private long frame_bytes;
public BuffRandAcceURL(String sURL) throws Exception
this(sURL MAX_WRITER);
public BuffRandAcceURL(String sURL int download_threads) throws Exception
buf = new byte[BUF_LENGTH];
buf_bytes = new int[UNIT_COUNT];
url = new URL(sURL);
//创建线程以异步方式解析ID
new TagThread(url);
//打印当前文件名
try
String s = URLDecoder decode(sURL GBK );
System out println( start>> + s substring(s lastIndexOf( / ) + ));
s = null;
catch (Exception e)
System out println( start>> + sURL);
//创建 写 线程
for(int i = ; i < download_threads; i++)
new Writer(this url buf i+ );
frame_bytes = file_length = HttpReader getContentLength();
if(file_length == )
Header strLastErr = 连接URL出错 重试 + HttpReader MAX_RETRY + 次后放弃 ;
throw new Exception( retry + HttpReader MAX_RETRY);
writer_count = download_threads;
//缓冲
try_cache();
//跳过ID v
MP Tag mP Tag = new MP Tag();
int v _size = mP Tag checkID V (buf );
if (v _size > )
frame_bytes = v _size;
//seek(v _size):
fill_bytes = v _size;
file_pointer = v _size;
read_pos = v _size;
read_pos &= BUF_LENGTH_MASK;
int units = v _size >> UNIT_LENGTH_BITS;
for(int i = ; i < units; i++)
buf_bytes[i] = ;
this notifyWriter();
buf_bytes[units] = v _size;
this notifyWriter();
mP Tag = null;
private void try_cache() throws InterruptedException
int cache_size = BUF_LENGTH;
if(cache_size > (int)file_length alloc_pos)
cache_size = (int)file_length alloc_pos;
cache_size = UNIT_LENGTH;
//等待填写当前正在读的那 一块 缓冲区
/**//*if(fill_bytes >= cache_size && writer_count > )
synchronized (buf)
buf wait();
return;
*/
//等待填满缓冲区
while (fill_bytes < cache_size)
if (writer_count == || isalive == false)
return;
if(BUF_LENGTH > (int)file_length alloc_pos)
cache_size = (int)file_length alloc_pos UNIT_LENGTH;
System out printf( \\r[缓冲% $ f%%] (float)fill_bytes / cache_size * );
synchronized (buf)
buf wait();
System out printf( \\r );
private int try_reading(int i int len) throws Exception
int n = (i == UNIT_COUNT ) ? : (i + );
int r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);
while (r < len)
if (writer_count == || isalive == false)
return r;
try_cache();
r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);
return len;
/**//*
* 各个 写 线程互斥等待空闲块
*/
public synchronized boolean tryWriting(Writer w) throws InterruptedException
await_count++;
while (buf_bytes[buf_index] != && isalive)
this wait();
//下载速度足够就结束一个 写 线程
if(writer_count > && w await_count >= await_count &&
w await_count >= writer_count)
return false;
if(alloc_pos >= file_length)
return false;
w await_count = await_count;
await_count ;
w start_pos = alloc_pos;
w index = buf_index;
alloc_pos += UNIT_LENGTH;
buf_index = (buf_index == UNIT_COUNT ) ? : buf_index + ;
return isalive;
public void updateBuffer(int i int len)
synchronized (buf)
buf_bytes[i] = len;
fill_bytes += len;
buf notify();
public void updateWriterCount()
synchronized (buf)
writer_count ;
buf notify();
public synchronized void notifyWriter()
this notifyAll();
public void terminateWriters()
synchronized (buf)
if (isalive)
isalive = false;
Header strLastErr = 读取文件超时 重试 + HttpReader MAX_RETRY
+ 次后放弃 请您稍后再试 ;
buf notify();
notifyWriter();
public int read() throws Exception
int iret = ;
int i = read_pos >> UNIT_LENGTH_BITS;
// 等待 有 字节可读
while (buf_bytes[i] < )
try_cache();
if (writer_count == )
return ;
if(isalive == false)
return ;
// 读取
iret = buf[read_pos] & xff;
fill_bytes ;
file_pointer++;
read_pos++;
read_pos &= BUF_LENGTH_MASK;
if ( buf_bytes[i] == )
notifyWriter(); // 通知
return iret;
public int read(byte b[]) throws Exception
return read(b b length);
public int read(byte[] b int off int len) throws Exception
if(len > UNIT_LENGTH)
len = UNIT_LENGTH;
int i = read_pos >> UNIT_LENGTH_BITS;
// 等待 有足够内容可读
if(try_reading(i len) < len || isalive == false)
return ;
// 读取
int tail_len = BUF_LENGTH read_pos; // write_pos != BUF_LENGTH
if (tail_len < len)
System arraycopy(buf read_pos b off tail_len);
System arraycopy(buf b off + tail_len len tail_len);
else
System arraycopy(buf read_pos b off len);
fill_bytes = len;
file_pointer += len;
read_pos += len;
read_pos &= BUF_LENGTH_MASK;
buf_bytes[i] = len;
if (buf_bytes[i] < )
int ni = read_pos >> UNIT_LENGTH_BITS;
buf_bytes[ni] += buf_bytes[i];
buf_bytes[i] = ;
notifyWriter();
else if (buf_bytes[i] == )
notifyWriter();
return len;
/**//*
* 从src_off位置复制 不移动文件 指针
*/
public int dump(int src_off byte b[] int dst_off int len) throws Exception
int rpos = read_pos + src_off;
if(try_reading(rpos >> UNIT_LENGTH_BITS len) < len || isalive == false)
return ;
int tail_len = BUF_LENGTH rpos;
if (tail_len < len)
System arraycopy(buf rpos b dst_off tail_len);
System arraycopy(buf b dst_off + tail_len len tail_len);
else
System arraycopy(buf rpos b dst_off len);
// 不发信号
return len;
public long length()
return file_length;
public long getFilePointer()
return file_pointer;
public void close()
//
//
public void seek(long pos) throws Exception
//
cha138/Article/program/Java/gj/201311/27619
相关参考
Java多线程如何防止主线的阻塞 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! Java多线程在
Java多线程下载 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 同时下载多个文件不过单文件没有
/** *thisprogramTODO &nb
Queue接口与ListSet同一级别都是继承了Collection接口LinkedList实现了Queue接口Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果
java读取文本文件的方法有很多这个例子主要介绍最简单最常用的BufferedReader类 完整例子如下 packagenetchinaunixbloghzmtext; impor
Java相对路径读取文件 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 不管你是新手还是老鸟在程
JAVA如何实现从最后一行读取文件 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! JAVA如何实
使用javautilProperties类的load()方法 示例 InputStreamin=newBufferedInputStream(newFileInputStream(name))
以下是实现的代码cha138/Article/program/Java/hx/201311/26965
JAVA读取xml文件中节点值 以下文字资料是由(全榜网网www.cha138.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! importwcd