Commit 3a000574 authored by Vladislav Bogdashkin's avatar Vladislav Bogdashkin 🎣

tours and files db model, download use case

parent 3dd76784
package com.biganto.visual.roompark.data.repository.db package com.biganto.visual.roompark.data.repository.db
import com.biganto.visual.roompark.data.repository.db.requrey.RevisionString
import com.biganto.visual.roompark.data.repository.db.requrey.model.* import com.biganto.visual.roompark.data.repository.db.requrey.model.*
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.DownloadState
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.TourPreviewEntity
import io.reactivex.Completable import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Observable import io.reactivex.Observable
import io.reactivex.Single import io.reactivex.Single
import io.requery.Persistable import io.requery.Persistable
import io.requery.reactivex.ReactiveResult import io.requery.reactivex.ReactiveResult
import io.requery.reactivex.ReactiveScalar
/** /**
* Created by Vladislav Bogdashkin on 29.10.2019. * Created by Vladislav Bogdashkin on 29.10.2019.
...@@ -36,4 +41,31 @@ interface IDb { ...@@ -36,4 +41,31 @@ interface IDb {
fun setDealReadState(id: String, state: Boolean): Completable fun setDealReadState(id: String, state: Boolean): Completable
fun saveSubscription(subscription: SubscriptionEntity): Single<SubscriptionEntity> fun saveSubscription(subscription: SubscriptionEntity): Single<SubscriptionEntity>
fun getSubscription(id: Int): ReactiveResult<SubscriptionEntity> fun getSubscription(id: Int): ReactiveResult<SubscriptionEntity>
fun deleteTourPreview(id: String): Int?
fun deleteTourPreview(entity: TourPreviewEntity)
fun deleteFiles(entity: List<FileEntity>)
fun deleteFile(entity: FileEntity)
fun deleteTourFilesJunction(tourId: String): ReactiveScalar<Int>
fun deleteTourFilesJunction(entity: List<TourFileJunctionEntity>)
fun getTourFilesJunctionUniqueFiles(tourId: String): List<TourFileJunctionEntity?>
fun getTourFilesJunction(tourId: String): ReactiveResult<TourFileJunctionEntity>
fun upsertFileEntity(entity: List<FileEntity>): Observable<Iterable<FileEntity>>
fun upsertFileEntity(entity: FileEntity): Observable<FileEntity>
fun flowableFileEntityes(uris: List<RevisionString>): Flowable<FileEntity>
fun getDownloadedSumFileEntityes(uris: List<RevisionString>): Long
fun fetchTouresWithStates(states: List<DownloadState>): MutableList<TourPreviewEntity>?
fun fetchFileEntityes(uris: List<RevisionString>): MutableList<FileEntity>
fun pushFileEntities(uris: List<FileEntity>): Observable<Iterable<FileEntity>>
fun getFileEntity(uri: RevisionString): ReactiveResult<FileEntity>
fun getTourFiles(entity: TourPreviewEntity): MutableList<TourFileJunctionEntity>
fun getTourFiles(tourId: String): MutableList<TourFileJunctionEntity>
fun getTourFilesObservable(tourId: String): ReactiveResult<TourFileJunctionEntity>
fun upsertTourPreview(list: List<TourPreviewEntity>): Observable<Iterable<TourPreviewEntity>>
fun upsertTourPreview(entity: TourPreviewEntity): Observable<TourPreviewEntity>
fun upsertTourFileJunction(entity: List<TourFileJunctionEntity>): Observable<Iterable<TourFileJunctionEntity>>
fun upsertTourFileJunction(entity: TourFileJunctionEntity): Observable<TourFileJunctionEntity>
fun observableTourDownloadState(): Flowable<TourPreviewEntity>
fun getFileEntitysOrDefault(uri: RevisionString): FileEntity
fun getTourPreview(tourId: String): ReactiveResult<TourPreviewEntity>
fun getTourPreviewsObservableResult(estateId: Int): Observable<ReactiveResult<TourPreviewEntity>>
} }
\ No newline at end of file
...@@ -5,11 +5,16 @@ import com.biganto.visual.roompark.Models ...@@ -5,11 +5,16 @@ import com.biganto.visual.roompark.Models
import com.biganto.visual.roompark.data.repository.db.IDb import com.biganto.visual.roompark.data.repository.db.IDb
import com.biganto.visual.roompark.data.repository.db.requrey.model.* import com.biganto.visual.roompark.data.repository.db.requrey.model.*
import com.biganto.visual.roompark.di.dagger.DATABASE_VERSION import com.biganto.visual.roompark.di.dagger.DATABASE_VERSION
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.DownloadState
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.TourPreview
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.TourPreviewEntity
import dagger.Module import dagger.Module
import dagger.Provides import dagger.Provides
import io.reactivex.BackpressureStrategy
import io.reactivex.Completable import io.reactivex.Completable
import io.reactivex.Observable import io.reactivex.Observable
import io.reactivex.Single import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import io.requery.Persistable import io.requery.Persistable
import io.requery.android.sqlite.DatabaseSource import io.requery.android.sqlite.DatabaseSource
import io.requery.reactivex.KotlinReactiveEntityStore import io.requery.reactivex.KotlinReactiveEntityStore
...@@ -185,4 +190,195 @@ class RequeryRepository @Inject constructor( ...@@ -185,4 +190,195 @@ class RequeryRepository @Inject constructor(
.where(SubscriptionEntity.ID.eq(id)) .where(SubscriptionEntity.ID.eq(id))
.get() .get()
///region tours files
override fun getTourPreviewsObservableResult(estateId: Int) = store
.select(TourPreviewEntity::class)
// .join(UserEntity::class).on(UserEntity::uuid.eq(estateId))
.where(TourPreviewEntity.ESTATE_ID.eq(estateId))
.get()
.observableResult()
override fun getTourPreview(tourId: String) = store
.select(TourPreviewEntity::class)
.where(TourPreviewEntity.ID.eq(tourId))
.get()
override fun upsertTourPreview(list: List<TourPreviewEntity>) = store
.upsert(list)
.doOnSuccess { Timber.d("Upsertd succses %s", it.count()) }
.toObservable()
override fun upsertTourPreview(entity: TourPreviewEntity) = store
.upsert(entity)
.toObservable()
override fun upsertTourFileJunction(entity: List<TourFileJunctionEntity>)
: Observable<Iterable<TourFileJunctionEntity>> = store.upsert(entity).toObservable()
override fun upsertTourFileJunction(entity: TourFileJunctionEntity) = store
.upsert(entity)
.toObservable()
override fun observableTourDownloadState() = store
.select(TourPreviewEntity::class)
.where(TourPreviewEntity.DOWNLOADED.notIn(arrayListOf(DownloadState.NotDownloaded, DownloadState.MetaPreparation)))
.get()
// .flowable()
// .
.observableResult()
.flatMap { it.observable() }
.toFlowable(BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
override fun getFileEntitysOrDefault(uri: RevisionString) = store
.select(FileEntity::class)
.where(FileEntity.URI.eq(uri))
.get()
.firstOr {
val entity = FileEntity()
entity.setUri(uri)
entity.setDownloadedSize(0)
entity.setTotalSize(0)
entity.setDownloaded(false)
entity
}
override fun getFileEntity(uri: RevisionString) = store
.select(FileEntity::class)
.where(FileEntity.URI.eq(uri))
.get()
override fun getTourFiles(entity : TourPreviewEntity ) =store
.select(TourFileJunctionEntity::class)
.where(TourFileJunctionEntity.TOUR.eq(entity.id))
.get()
.toList()
override fun getTourFiles(tourId : String) =store
.select(TourFileJunctionEntity::class)
.where(TourFileJunctionEntity.TOUR.eq(tourId))
.get()
.toList()
override fun getTourFilesObservable(tourId : String) =store
.select(TourFileJunctionEntity::class)
.where(TourFileJunctionEntity.TOUR.eq(tourId))
.get()
override fun pushFileEntities(uris: List<FileEntity>) = store
.select(FileEntity::class)
.where(FileEntity.URI.`in`(uris.map { it.uri }.toList()))
.get()
.observable()
.toList()
.map {existedUris-> uris.filter{!existedUris.map {file -> file.uri.revisionUri() }.contains(it.uri.revisionUri()) }.toList()}
.flatMapObservable{ store.insert(it).toObservable() }
override fun fetchFileEntityes(uris: List<RevisionString>) = store
.select(FileEntity::class)
.where(FileEntity.URI.`in`(uris))
.get()
.toList()
override fun fetchTouresWithStates(states: List<DownloadState>) = store
.select(TourPreviewEntity::class)
.where(TourPreviewEntity.DOWNLOADED.`in`(states))
.get()
.toList()
override fun getDownloadedSumFileEntityes(uris: List<RevisionString>) = store
.select(FileEntity::class)
.where(FileEntity.URI.`in`(uris))
.get()
.map{ it.downloadedSize }
.sum()
// .observable()
override fun flowableFileEntityes(uris: List<RevisionString>) = store
.select(FileEntity::class)
.where(FileEntity.URI.`in`(uris))
.get()
.flowable()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
override fun upsertFileEntity(entity: FileEntity) = store
.upsert(entity)
.toObservable()
override fun upsertFileEntity(entity: List<FileEntity>) = store
.upsert(entity)
.toObservable()
override fun getTourFilesJunction(tourId:String) = store
.select(TourFileJunctionEntity::class)
.where(TourFileJunctionEntity.TOUR.eq(tourId))
.get()
override fun getTourFilesJunctionUniqueFiles(tourId:String):
List<TourFileJunctionEntity?> =
store
.raw(TourFileJunctionEntity::class,
"SELECT tfj.* from TourFileJunction tfj " +
"join TourFileJunction tfj2 on tfj2.file = tfj.File " +
"WHERE tfj2.tour = $tourId " +
"GROUP BY tfj.file " +
"HAVING COUNT(tfj.file) < 2"
)
.toList()
private fun <E:Persistable> deleteBlocking(entity:E) = store
.delete(entity)
.blockingAwait()
private fun <E:List<Persistable>> deleteBlocking(entity:E) = store
.delete(entity)
.blockingAwait()
override fun deleteTourFilesJunction(entity: List<TourFileJunctionEntity>) =
deleteBlocking(entity)
override fun deleteTourFilesJunction(tourId:String) = store
.delete(TourFileJunctionEntity::class)
.where(TourFileJunctionEntity.TOUR.eq(tourId))
.get()
override fun deleteFile(entity:FileEntity) = deleteBlocking(entity)
override fun deleteFiles(entity:List<FileEntity>) = deleteBlocking(entity)
override fun deleteTourPreview(entity: TourPreviewEntity) = deleteBlocking(entity)
override fun deleteTourPreview(id:String) = store
.delete(TourPreview::class)
.where(TourPreviewEntity.ID.eq(id))
.get()
.single()
.blockingGet()
///endregion
} }
package com.biganto.visual.roompark.domain.use_case
import android.app.Application
import android.media.MediaScannerConnection
import com.biganto.visual.roompark.data.repository.api.biganto.IBigantoApi
import com.biganto.visual.roompark.data.repository.db.IDb
import com.biganto.visual.roompark.data.repository.db.requrey.RevisionString
import com.biganto.visual.roompark.data.repository.db.requrey.model.FileEntity
import com.biganto.visual.roompark.data.repository.db.requrey.model.TourFileJunctionEntity
import com.biganto.visual.roompark.data.repository.db.requrey.model.fromRaw
import com.biganto.visual.roompark.data.repository.file.FileModule
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.DownloadState
import com.biganto.visual.roomparkvr.data.repository.db.requery.model.TourPreviewEntity
import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
import okhttp3.ResponseBody
import okio.Okio
import timber.log.Timber
import java.io.File
import java.util.*
import java.util.concurrent.TimeUnit
import javax.inject.Inject
/**
* Created by Vladislav Bogdashkin on 02.08.2019.
*/
private const val META_PREDICTION="/tourMeta_"
private const val META_FILE_TYPE=".json"
class DownloadUseCase @Inject constructor(
private val db: IDb,
private val api: IBigantoApi,
private val fileModule: FileModule,
private val context: Application
) {
private fun writeFile(
response: ResponseBody,
model: TourFileData,
token: CancellationToken
): Observable<TourFileData> {
return Observable.create<TourFileData> { sub ->
try {
if (token.isCancelled) {
sub.onNext(model)
sub.onComplete()
}
val fileStorage = fileModule.getFile(model.fileUrl.uri())
val sink =
if (model.fileDownloadedSize>0)
Okio.buffer(Okio.appendingSink(fileStorage))
else Okio.buffer(Okio.sink(fileStorage))
val buffer = sink.buffer()
var read = 0L
val step = 4096
val source = response.source()
var stop: Boolean = token.isCancelled
sink.use {
while (!stop && { read = source.read(buffer, step.toLong());read }() != -1L) {
model.tempDownloadedSize += read
model.fileDownloadedSize += read
if (model.tempOverallFileSize == 0L)
model.tempTourTotalDiff += model.tempDownloadedSize
if (token.isCancelled) {
model.isDownloaded = false
model.fileDownloadedSize = 0
sub.onComplete()
stop = true
source.buffer.flush()
}
}
}
model.isDownloaded = (source.exhausted()
&& (model.fileDownloadedSize == model.tempOverallFileSize
|| model.tempOverallFileSize == 0L))
sub.onNext(model.copy())
model.tempTourTotalDiff = 0
model.tempDownloadedSize = 0
sub.onComplete()
sink.close()
} catch (e: Throwable) {
Timber.e(e)
if (!sub.isDisposed)
sub.onError(e)
token.isCancelled = true
}
}
}
private fun mergeFiles(files: List<FileEntity>) {
Timber.d("Nerge files")
files.forEach { file ->
val entity: FileEntity? = db.getFileEntity(file.uri).firstOrNull()
entity?.let {
file.setDownloaded(it.isDownloaded)
file.setDownloadedSize(it.downloadedSize)
file.setTotalSize(it.totalSize)
}
}
Timber.d("save files: ${files.size}")
db.upsertFileEntity(files)?.blockingSubscribe { Timber.d("file saved") }
}
@Volatile
private var tourDbModel: TourPreviewEntity? = null
private fun setDownloadInfo(
id: String,
downloadedSize: Long? = null
, downloadedDiffSize: Long? = null
, totalSize: Long? = null
, resolution: Int? = null
, filesCount: Int? = null
, tempLoadedFiles: Int? = null
, totalSizedDiffSize: Long? = null
) {
if (tourDbModel?.id != id)
throw error("Wrong tour id")
tourDbModel?.let { entity ->
downloadedSize?.let { entity.downloadedSize = it }
downloadedDiffSize?.let { entity.downloadedSize += it }
totalSize?.let { entity.overallSize = it }
resolution?.let { entity.targetResolution = it }
filesCount?.let { entity.overallFiles = it }
tempLoadedFiles?.let { entity.downloadedFiles += it }
totalSizedDiffSize?.let { entity.overallSize += it }
if (entity.downloadedFiles == entity.overallFiles)
entity.isDownloaded = DownloadState.Downloaded
}
Timber.d(" tour: ${tourDbModel?.downloadedFiles} / ${tourDbModel?.overallFiles}")
}
private fun observableTourDownloading(tour: TourPreviewEntity, token: CancellationToken) =
api.getTourFiles(tour.id, tour.targetResolution.toString())
.map { tourDbModel = tour;it.first() }
.map { raw ->
var downloadedSize = 0L
var totalSize = 0L
val fileEntities = raw.files.map(::fromRaw)
mergeFiles(fileEntities)
val jlist = db.getTourFilesJunction(tour.id).toList()
val junctionList = fileEntities
.map {file ->
val entity = jlist.firstOrNull{it.tour == tour.id && it.file == file.uri}
?: TourFileJunctionEntity()
entity.setTour(tour.id)
entity.setFile(file.uri)
downloadedSize += file.downloadedSize
totalSize += file.totalSize
entity
}
setDownloadInfo(
raw.id.toString()
, downloadedSize = downloadedSize
, totalSize = totalSize
, resolution = raw.resolution
, filesCount = raw.files.count()
)
junctionList
}
.doOnNext { junctionList ->
db.upsertTourFileJunction(junctionList)?.subscribe { Timber.d("junction upserted") }
}
.doOnNext { _ ->
tourDbModel?.let {
db.upsertTourPreview(it)?.subscribe { Timber.d("tour upserted") }
}
}
.flatMapIterable { it }
.flatMap { junction ->
db.getFileEntity(junction.file)
.observable()
.map { entity ->
TourFileData(
fileUrl = junction.file
, tourId = junction.tour
, tempDownloadedSize = 0L
, tempOverallFileSize = entity.totalSize
, fileDownloadedSize = entity.downloadedSize
, tempTourTotalDiff = 0L
, isDownloaded = entity.isDownloaded
)
}
}
.toFlowable(BackpressureStrategy.BUFFER)
.parallel(4)
.runOn(Schedulers.io())
.filter { !token.isCancelled }
.flatMap { model ->
if (model.isDownloaded)
return@flatMap Flowable.just(model)
var header: HashMap<String, String>? = null
if (model.fileDownloadedSize > 0){
header = hashMapOf(Pair("Range", "bytes=${model.fileDownloadedSize}-"))
Timber.w("trying to continue download file " +
"url by: ${model.fileUrl}" +
"size is: ${model.fileDownloadedSize}/${model.tempOverallFileSize}" +
"and header is: $header")
}
api.downloadFile(model.fileUrl.revisionUri(), header)
.doOnError {
Timber.e(it)
}
.flatMap<TourFileData> {
writeFile(it, model, token)
.toFlowable(BackpressureStrategy.BUFFER)
.doOnCancel { Timber.d("CANCELLED") }
}
.flatMap { downloadInfo ->
db.upsertFileEntity(
FileEntity().also{
it.setUri(downloadInfo.fileUrl)
it.setDownloadedSize(downloadInfo.fileDownloadedSize)
it.setTotalSize(downloadInfo.tempOverallFileSize)
it.setDownloaded(downloadInfo.isDownloaded)})
.toFlowable(BackpressureStrategy.BUFFER)
.map { downloadInfo }
}
}
.sequential()
.toObservable()
// .buffer(15L,TimeUnit.MILLISECONDS)
// .flatMapIterable { it }
.map { model ->
setDownloadInfo(
model.tourId
, totalSizedDiffSize = model.tempTourTotalDiff
, downloadedDiffSize = model.tempDownloadedSize
, tempLoadedFiles = if (model.isDownloaded) 1 else null
)
model.tempDownloadedSize = 0
model.tourId
}
.delay(14L, TimeUnit.MILLISECONDS)
// .sample(37L, TimeUnit.MILLISECONDS)
.flatMap { db.upsertTourPreview(tourDbModel!!) }
fun startTourDownloading(tourId: String, cancellataionToken: CancellationToken)
: Observable<TourPreviewEntity> = db.getTourPreview(tourId).observable()
.doOnNext {
it.isDownloaded = DownloadState.Downloading
it.downloadedFiles = 0
it.downloadedSize = 0L
it.tempSize = it.overallSize // <- overall changes due downloading!!
it.overallSize = 0L
}
.flatMap { db.upsertTourPreview(it) }
.flatMap(::getMeta)
.doOnError(Timber::e)
.flatMap { observableTourDownloading(it, cancellataionToken) }
private fun getMeta(tour: TourPreviewEntity) =
api.getTourMetaAsString(tour.id)
?.doOnNext { meta ->
tour.let {
val metaUri = RevisionString("$META_PREDICTION${tour.id}$META_FILE_TYPE")
it.setMetaFileEntityId(metaUri)
fileModule.saveFileToDisk(fileModule.getFile(metaUri.uri()), meta)
}
}
?.map { tour }
?.onErrorReturn {
tour.isDownloaded = DownloadState.Crushed
db.upsertTourPreview(tour)?.blockingSubscribe()
tour
}
//#endregion oldMethod
private fun refreshGallery(file: File) {
MediaScannerConnection.scanFile(
context, arrayOf(file.path), null
)
{ path, uri ->
{}//Timber.d("Scanned $path")
}
}
data class TourFileData(
val fileUrl: RevisionString,
val tourId: String,
var tempDownloadedSize: Long = 0L,
var tempOverallFileSize: Long = 0L,
var fileDownloadedSize: Long = 0L,
var tempTourTotalDiff: Long = 0L,
var isDownloaded: Boolean = false
)
data class CancellationToken(var isCancelled: Boolean)
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment