long-term downloads resume

* recovery infrastructure
* bump serialVersionUID of DownloadMission
* misc cleanup in DownloadMission.java
* remove unused/redundant from strings.xml
This commit is contained in:
kapodamy 2019-09-28 18:11:05 -03:00
parent dab53450c1
commit 86dafdd92b
42 changed files with 478 additions and 97 deletions

View file

@ -1,6 +1,7 @@
package us.shandian.giga.get;
import androidx.annotation.NonNull;
import android.text.TextUtils;
import android.util.Log;
import org.schabi.newpipe.streams.io.SharpStream;
@ -151,6 +152,20 @@ public class DownloadInitializer extends Thread {
if (!mMission.running || Thread.interrupted()) return;
if (!mMission.unknownLength && mMission.recoveryInfo != null) {
String entityTag = mConn.getHeaderField("ETAG");
String lastModified = mConn.getHeaderField("Last-Modified");
MissionRecoveryInfo recovery = mMission.recoveryInfo[mMission.current];
if (!TextUtils.isEmpty(entityTag)) {
recovery.validateCondition = entityTag;
} else if (!TextUtils.isEmpty(lastModified)) {
recovery.validateCondition = lastModified;// Note: this is less precise
} else {
recovery.validateCondition = null;
}
}
mMission.running = false;
break;
} catch (InterruptedIOException | ClosedByInterruptException e) {

View file

@ -27,7 +27,7 @@ import us.shandian.giga.util.Utility;
import static org.schabi.newpipe.BuildConfig.DEBUG;
public class DownloadMission extends Mission {
private static final long serialVersionUID = 5L;// last bump: 30 june 2019
private static final long serialVersionUID = 6L;// last bump: 28 september 2019
static final int BUFFER_SIZE = 64 * 1024;
static final int BLOCK_SIZE = 512 * 1024;
@ -51,8 +51,9 @@ public class DownloadMission extends Mission {
public static final int ERROR_INSUFFICIENT_STORAGE = 1010;
public static final int ERROR_PROGRESS_LOST = 1011;
public static final int ERROR_TIMEOUT = 1012;
public static final int ERROR_RESOURCE_GONE = 1013;
public static final int ERROR_HTTP_NO_CONTENT = 204;
public static final int ERROR_HTTP_UNSUPPORTED_RANGE = 206;
static final int ERROR_HTTP_FORBIDDEN = 403;
/**
* The urls of the file to download
@ -125,6 +126,11 @@ public class DownloadMission extends Mission {
*/
public int threadCount = 3;
/**
* information required to recover a download
*/
public MissionRecoveryInfo[] recoveryInfo;
private transient int finishCount;
public transient boolean running;
public boolean enqueued;
@ -132,7 +138,6 @@ public class DownloadMission extends Mission {
public int errCode = ERROR_NOTHING;
public Exception errObject = null;
public transient boolean recovered;
public transient Handler mHandler;
private transient boolean mWritingToFile;
private transient boolean[] blockAcquired;
@ -197,9 +202,9 @@ public class DownloadMission extends Mission {
}
/**
* Open connection
* Opens a connection
*
* @param threadId id of the calling thread, used only for debug
* @param threadId id of the calling thread, used only for debugging
* @param rangeStart range start
* @param rangeEnd range end
* @return a {@link java.net.URLConnection URLConnection} linking to the URL.
@ -251,7 +256,7 @@ public class DownloadMission extends Mission {
case 204:
case 205:
case 207:
throw new HttpError(conn.getResponseCode());
throw new HttpError(statusCode);
case 416:
return;// let the download thread handle this error
default:
@ -270,10 +275,6 @@ public class DownloadMission extends Mission {
synchronized void notifyProgress(long deltaLen) {
if (!running) return;
if (recovered) {
recovered = false;
}
if (unknownLength) {
length += deltaLen;// Update length before proceeding
}
@ -344,7 +345,6 @@ public class DownloadMission extends Mission {
if (running) {
running = false;
recovered = true;
if (threads != null) selfPause();
}
}
@ -409,12 +409,13 @@ public class DownloadMission extends Mission {
* Start downloading with multiple threads.
*/
public void start() {
if (running || isFinished()) return;
if (running || isFinished() || urls.length < 1) return;
// ensure that the previous state is completely paused.
joinForThread(init);
int maxWait = 10000;// 10 seconds
joinForThread(init, maxWait);
if (threads != null) {
for (Thread thread : threads) joinForThread(thread);
for (Thread thread : threads) joinForThread(thread, maxWait);
threads = null;
}
@ -431,6 +432,11 @@ public class DownloadMission extends Mission {
return;
}
if (urls[current] == null) {
doRecover(null);
return;
}
if (blocks == null) {
initializer();
return;
@ -478,7 +484,6 @@ public class DownloadMission extends Mission {
}
running = false;
recovered = true;
if (init != null && init.isAlive()) {
// NOTE: if start() method is running ¡will no have effect!
@ -563,7 +568,7 @@ public class DownloadMission extends Mission {
* Write this {@link DownloadMission} to the meta file asynchronously
* if no thread is already running.
*/
private void writeThisToFile() {
void writeThisToFile() {
synchronized (LOCK) {
if (deleted) return;
Utility.writeToFile(metadata, DownloadMission.this);
@ -667,6 +672,7 @@ public class DownloadMission extends Mission {
* @return {@code true} is this mission its "healthy", otherwise, {@code false}
*/
public boolean isCorrupt() {
if (urls.length < 1) return false;
return (isPsFailed() || errCode == ERROR_POSTPROCESSING_HOLD) || isFinished();
}
@ -710,6 +716,48 @@ public class DownloadMission extends Mission {
return true;
}
/**
* Attempts to recover the download
*
* @param fromError exception which require update the url from the source
*/
void doRecover(Exception fromError) {
Log.i(TAG, "Attempting to recover the mission: " + storage.getName());
if (recoveryInfo == null) {
if (fromError == null)
notifyError(ERROR_RESOURCE_GONE, null);
else
notifyError(fromError);
urls = new String[0];// mark this mission as dead
return;
}
if (threads != null) {
for (Thread thread : threads) {
if (thread == Thread.currentThread()) continue;
thread.interrupt();
joinForThread(thread, 0);
}
}
// set the current download url to null in case if the recovery
// process is canceled. Next time start() method is called the
// recovery will be executed, saving time
urls[current] = null;
if (recoveryInfo[current].attempts >= maxRetry) {
recoveryInfo[current].attempts = 0;
notifyError(fromError);
return;
}
threads = new Thread[]{
runAsync(DownloadMissionRecover.mID, new DownloadMissionRecover(this, fromError))
};
}
private boolean deleteThisFromFile() {
synchronized (LOCK) {
return metadata.delete();
@ -749,7 +797,13 @@ public class DownloadMission extends Mission {
return who;
}
private void joinForThread(Thread thread) {
/**
* Waits at most {@code millis} milliseconds for the thread to die
*
* @param thread the desired thread
* @param millis the time to wait in milliseconds
*/
private void joinForThread(Thread thread, int millis) {
if (thread == null || !thread.isAlive()) return;
if (thread == Thread.currentThread()) return;
@ -764,7 +818,7 @@ public class DownloadMission extends Mission {
// start() method called quickly after pause()
try {
thread.join(10000);
thread.join(millis);
} catch (InterruptedException e) {
Log.d(TAG, "timeout on join : " + thread.getName());
throw new RuntimeException("A thread is still running:\n" + thread.getName());
@ -785,9 +839,9 @@ public class DownloadMission extends Mission {
}
}
static class Block {
int position;
int done;
public static class Block {
public int position;
public int done;
}
private static class Lock implements Serializable {

View file

@ -0,0 +1,222 @@
package us.shandian.giga.get;
import android.util.Log;
import org.schabi.newpipe.extractor.NewPipe;
import org.schabi.newpipe.extractor.StreamingService;
import org.schabi.newpipe.extractor.stream.AudioStream;
import org.schabi.newpipe.extractor.stream.StreamExtractor;
import org.schabi.newpipe.extractor.stream.SubtitlesStream;
import org.schabi.newpipe.extractor.stream.VideoStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.channels.ClosedByInterruptException;
import java.util.List;
import static us.shandian.giga.get.DownloadMission.ERROR_RESOURCE_GONE;
public class DownloadMissionRecover extends Thread {
private static final String TAG = "DownloadMissionRecover";
static final int mID = -3;
private final DownloadMission mMission;
private final MissionRecoveryInfo mRecovery;
private final Exception mFromError;
private HttpURLConnection mConn;
DownloadMissionRecover(DownloadMission mission, Exception originError) {
mMission = mission;
mFromError = originError;
mRecovery = mission.recoveryInfo[mission.current];
}
@Override
public void run() {
if (mMission.source == null) {
mMission.notifyError(mFromError);
return;
}
try {
/*if (mMission.source.startsWith(MissionRecoveryInfo.DIRECT_SOURCE)) {
resolve(mMission.source.substring(MissionRecoveryInfo.DIRECT_SOURCE.length()));
return;
}*/
StreamingService svr = NewPipe.getServiceByUrl(mMission.source);
if (svr == null) {
throw new RuntimeException("Unknown source service");
}
StreamExtractor extractor = svr.getStreamExtractor(mMission.source);
extractor.fetchPage();
if (!mMission.running || super.isInterrupted()) return;
String url = null;
switch (mMission.kind) {
case 'a':
for (AudioStream audio : extractor.getAudioStreams()) {
if (audio.average_bitrate == mRecovery.desiredBitrate && audio.getFormat() == mRecovery.format) {
url = audio.getUrl();
break;
}
}
break;
case 'v':
List<VideoStream> videoStreams;
if (mRecovery.desired2)
videoStreams = extractor.getVideoOnlyStreams();
else
videoStreams = extractor.getVideoStreams();
for (VideoStream video : videoStreams) {
if (video.resolution.equals(mRecovery.desired) && video.getFormat() == mRecovery.format) {
url = video.getUrl();
break;
}
}
break;
case 's':
for (SubtitlesStream subtitles : extractor.getSubtitles(mRecovery.format)) {
String tag = subtitles.getLanguageTag();
if (tag.equals(mRecovery.desired) && subtitles.isAutoGenerated() == mRecovery.desired2) {
url = subtitles.getURL();
break;
}
}
break;
default:
throw new RuntimeException("Unknown stream type");
}
resolve(url);
} catch (Exception e) {
if (!mMission.running || e instanceof ClosedByInterruptException) return;
mRecovery.attempts++;
mMission.notifyError(e);
}
}
private void resolve(String url) throws IOException, DownloadMission.HttpError {
if (mRecovery.validateCondition == null) {
Log.w(TAG, "validation condition not defined, the resource can be stale");
}
if (mMission.unknownLength || mRecovery.validateCondition == null) {
recover(url, false);
return;
}
///////////////////////////////////////////////////////////////////////
////// Validate the http resource doing a range request
/////////////////////
try {
mConn = mMission.openConnection(url, mID, mMission.length - 10, mMission.length);
mConn.setRequestProperty("If-Range", mRecovery.validateCondition);
mMission.establishConnection(mID, mConn);
int code = mConn.getResponseCode();
switch (code) {
case 200:
case 413:
// stale
recover(url, true);
return;
case 206:
// in case of validation using the Last-Modified date, check the resource length
long[] contentRange = parseContentRange(mConn.getHeaderField("Content-Range"));
boolean lengthMismatch = contentRange[2] != -1 && contentRange[2] != mMission.length;
recover(url, lengthMismatch);
return;
}
throw new DownloadMission.HttpError(code);
} catch (Exception e) {
if (!mMission.running || e instanceof ClosedByInterruptException) return;
throw e;
} finally {
this.interrupt();
}
}
private void recover(String url, boolean stale) {
Log.i(TAG,
String.format("download recovered name=%s isStale=%s url=%s", mMission.storage.getName(), stale, url)
);
if (url == null) {
mMission.notifyError(ERROR_RESOURCE_GONE, null);
return;
}
mMission.urls[mMission.current] = url;
mRecovery.attempts = 0;
if (stale) {
mMission.resetState(false, false, DownloadMission.ERROR_NOTHING);
}
mMission.writeThisToFile();
if (!mMission.running || super.isInterrupted()) return;
mMission.running = false;
mMission.start();
}
private long[] parseContentRange(String value) {
long[] range = new long[3];
if (value == null) {
// this never should happen
return range;
}
try {
value = value.trim();
if (!value.startsWith("bytes")) {
return range;// unknown range type
}
int space = value.lastIndexOf(' ') + 1;
int dash = value.indexOf('-', space) + 1;
int bar = value.indexOf('/', dash);
// start
range[0] = Long.parseLong(value.substring(space, dash - 1));
// end
range[1] = Long.parseLong(value.substring(dash, bar));
// resource length
value = value.substring(bar + 1);
if (value.equals("*")) {
range[2] = -1;// unknown length received from the server but should be valid
} else {
range[2] = Long.parseLong(value);
}
} catch (Exception e) {
// nothing to do
}
return range;
}
@Override
public void interrupt() {
super.interrupt();
if (mConn != null) {
try {
mConn.disconnect();
} catch (Exception e) {
// nothing to do
}
}
}
}

View file

@ -10,8 +10,10 @@ import java.net.HttpURLConnection;
import java.nio.channels.ClosedByInterruptException;
import us.shandian.giga.get.DownloadMission.Block;
import us.shandian.giga.get.DownloadMission.HttpError;
import static org.schabi.newpipe.BuildConfig.DEBUG;
import static us.shandian.giga.get.DownloadMission.ERROR_HTTP_FORBIDDEN;
/**
@ -19,7 +21,7 @@ import static org.schabi.newpipe.BuildConfig.DEBUG;
* an error occurs or the process is stopped.
*/
public class DownloadRunnable extends Thread {
private static final String TAG = DownloadRunnable.class.getSimpleName();
private static final String TAG = "DownloadRunnable";
private final DownloadMission mMission;
private final int mId;
@ -41,13 +43,7 @@ public class DownloadRunnable extends Thread {
public void run() {
boolean retry = false;
Block block = null;
int retryCount = 0;
if (DEBUG) {
Log.d(TAG, mId + ":recovered: " + mMission.recovered);
}
SharpStream f;
try {
@ -133,6 +129,17 @@ public class DownloadRunnable extends Thread {
} catch (Exception e) {
if (!mMission.running || e instanceof ClosedByInterruptException) break;
if (e instanceof HttpError && ((HttpError) e).statusCode == ERROR_HTTP_FORBIDDEN) {
// for youtube streams. The url has expired, recover
f.close();
if (mId == 1) {
// only the first thread will execute the recovery procedure
mMission.doRecover(e);
}
return;
}
if (retryCount++ >= mMission.maxRetry) {
mMission.notifyError(e);
break;
@ -144,11 +151,7 @@ public class DownloadRunnable extends Thread {
}
}
try {
f.close();
} catch (Exception err) {
// ¿ejected media storage? ¿file deleted? ¿storage ran out of space?
}
f.close();
if (DEBUG) {
Log.d(TAG, "thread " + mId + " exited from main download loop");

View file

@ -10,9 +10,11 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.channels.ClosedByInterruptException;
import us.shandian.giga.get.DownloadMission.HttpError;
import us.shandian.giga.util.Utility;
import static org.schabi.newpipe.BuildConfig.DEBUG;
import static us.shandian.giga.get.DownloadMission.ERROR_HTTP_FORBIDDEN;
/**
* Single-threaded fallback mode
@ -85,7 +87,7 @@ public class DownloadRunnableFallback extends Thread {
mIs = mConn.getInputStream();
byte[] buf = new byte[64 * 1024];
byte[] buf = new byte[DownloadMission.BUFFER_SIZE];
int len = 0;
while (mMission.running && (len = mIs.read(buf, 0, buf.length)) != -1) {
@ -103,6 +105,13 @@ public class DownloadRunnableFallback extends Thread {
if (!mMission.running || e instanceof ClosedByInterruptException) return;
if (e instanceof HttpError && ((HttpError) e).statusCode == ERROR_HTTP_FORBIDDEN) {
// for youtube streams. The url has expired, recover
mMission.doRecover(e);
dispose();
return;
}
if (mRetryCount++ >= mMission.maxRetry) {
mMission.notifyError(e);
return;

View file

@ -0,0 +1,79 @@
package us.shandian.giga.get;
import android.os.Parcel;
import android.os.Parcelable;
import android.support.annotation.NonNull;
import org.schabi.newpipe.extractor.MediaFormat;
import org.schabi.newpipe.extractor.stream.AudioStream;
import org.schabi.newpipe.extractor.stream.Stream;
import org.schabi.newpipe.extractor.stream.SubtitlesStream;
import org.schabi.newpipe.extractor.stream.VideoStream;
import java.io.Serializable;
public class MissionRecoveryInfo implements Serializable, Parcelable {
private static final long serialVersionUID = 0L;
//public static final String DIRECT_SOURCE = "direct-source://";
public MediaFormat format;
String desired;
boolean desired2;
int desiredBitrate;
transient int attempts = 0;
String validateCondition = null;
public MissionRecoveryInfo(@NonNull Stream stream) {
if (stream instanceof AudioStream) {
desiredBitrate = ((AudioStream) stream).average_bitrate;
desired2 = false;
} else if (stream instanceof VideoStream) {
desired = ((VideoStream) stream).getResolution();
desired2 = ((VideoStream) stream).isVideoOnly();
} else if (stream instanceof SubtitlesStream) {
desired = ((SubtitlesStream) stream).getLanguageTag();
desired2 = ((SubtitlesStream) stream).isAutoGenerated();
} else {
throw new RuntimeException("Unknown stream kind");
}
format = stream.getFormat();
if (format == null) throw new NullPointerException("Stream format cannot be null");
}
@Override
public int describeContents() {
return 0;
}
@Override
public void writeToParcel(Parcel parcel, int flags) {
parcel.writeInt(this.format.ordinal());
parcel.writeString(this.desired);
parcel.writeInt(this.desired2 ? 0x01 : 0x00);
parcel.writeInt(this.desiredBitrate);
parcel.writeString(this.validateCondition);
}
private MissionRecoveryInfo(Parcel parcel) {
this.format = MediaFormat.values()[parcel.readInt()];
this.desired = parcel.readString();
this.desired2 = parcel.readInt() != 0x00;
this.desiredBitrate = parcel.readInt();
this.validateCondition = parcel.readString();
}
public static final Parcelable.Creator<MissionRecoveryInfo> CREATOR = new Parcelable.Creator<MissionRecoveryInfo>() {
@Override
public MissionRecoveryInfo createFromParcel(Parcel source) {
return new MissionRecoveryInfo(source);
}
@Override
public MissionRecoveryInfo[] newArray(int size) {
return new MissionRecoveryInfo[size];
}
};
}