From 33f9f5640df5fe9f81cb5615752e3fc7ee1935e6 Mon Sep 17 00:00:00 2001 From: Cameron McOnie Date: Wed, 29 May 2019 13:57:38 +0200 Subject: [PATCH 1/4] Change aync to complete with an array of any Add a worker queue to handle object mapping --- BlockV/Core/Data Pool/DataPool.swift | 2 +- BlockV/Core/Data Pool/Regions/Region.swift | 98 +++++++++++++++++----- 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/BlockV/Core/Data Pool/DataPool.swift b/BlockV/Core/Data Pool/DataPool.swift index 2a76e573..74356b9f 100644 --- a/BlockV/Core/Data Pool/DataPool.swift +++ b/BlockV/Core/Data Pool/DataPool.swift @@ -81,7 +81,7 @@ public final class DataPool { // unable to load from disk printBV(error: "[DataPool] Unable to load region state from disk. " + err.localizedDescription) - }.then { _ -> Guarantee in + }.then { _ -> Guarantee<[Any]> in // start sync'ing region data with the server return region.synchronize() diff --git a/BlockV/Core/Data Pool/Regions/Region.swift b/BlockV/Core/Data Pool/Regions/Region.swift index b36541a2..e13b2e50 100644 --- a/BlockV/Core/Data Pool/Regions/Region.swift +++ b/BlockV/Core/Data Pool/Regions/Region.swift @@ -84,21 +84,28 @@ public class Region { /// `true` if this region has been closed. public fileprivate(set) var closed = false + + /// Concurrent worker queue. + let workerQueue = DispatchQueue(label: "io.blockv.sdk.datapool-worker", qos: .userInitiated, attributes: .concurrent) /// Re-synchronizes the region by manually fetching objects from the server again. - public func forceSynchronize() -> Guarantee { + public func forceSynchronize() -> Guarantee<[Any]> { self.synchronized = false return self.synchronize() } + + public var isSynchronizing: Bool { + return !(_syncPromise == nil) + } /// Currently executing synchronization promise. `nil` if there is no synchronization underway. - private var _syncPromise: Guarantee? + private var _syncPromise: Guarantee<[Any]>? /// Attempts to stablaize the region by querying the backend for all data. /// /// - Returns: Promise which resolves when complete. @discardableResult - public func synchronize() -> Guarantee { + public func synchronize() -> Guarantee<[Any]> { self.emit(.synchronizing, userInfo: [:]) @@ -109,18 +116,24 @@ public class Region { // remove pending error self.error = nil - self.emit(.updated) +// self.emit(.updated) //FIXME: This seems an odd place for an `.update` call // stop if already in sync if synchronized { - return Guarantee() + + Guarantee { resolver in + workerQueue.async { + resolver(self.getAll()) + } + } + } // ask the subclass to load it's data printBV(info: "[DataPool > Region] Starting synchronization for region \(self.stateKey)") // load objects - _syncPromise = self.load().map { ids -> Void in + _syncPromise = self.load().then { ids -> Guarantee<[Any]> in /* The subclass is expected to call the add method as it finds object, and then, once @@ -132,11 +145,18 @@ public class Region { if let ids = ids { self.diffedRemove(ids: ids) } - - // data is up to date - self.synchronized = true - self._syncPromise = nil - printBV(info: "[DataPool > Region] Region '\(self.stateKey)' is now in sync!") + + return Guarantee { resolver in + self.workerQueue.async { + resolver(self.getAll()) + DispatchQueue.main.async { + // data is up to date + self.synchronized = true + self._syncPromise = nil + printBV(info: "[DataPool > Region] Region '\(self.stateKey)' is now in sync!") + } + } + } }.recover { err in // error handling, notify listeners of an error @@ -144,6 +164,7 @@ public class Region { self.error = err printBV(error: "[DataPool > Region] Unable to load: " + err.localizedDescription) self.emit(.error, userInfo: ["error": err]) + return Guarantee.value([]) //FIXME: Surely a better approach is to return the errors from data pool? } // return promise @@ -348,18 +369,58 @@ public class Region { /// Returns all the objects within this region. Waits until the region is stable first. /// /// - Returns: Array of objects. Check the region-specific map() function to see what types are returned. - public func getAllStable() -> Guarantee<[Any]> { + public func getAllStable() -> Guarantee<[Any]> { //FIXME: This function is no longer needed, rather call `synchronize` directly. // synchronize now - return self.synchronize().map { - return self.getAll() - } + return self.synchronize() } + +// public func getAllConcurrently() -> [Any] { +// +// // create array of all items +// var items: [Any] = [] +// +// let max = objects.count +// +// let objs = Array(objects.values) +// +// for i in stride(from: 0, to: max - 10, by: 10) { +// +// // wrong: https://stackoverflow.com/questions/54775099/is-it-possible-to-specify-the-dispatchqueue-for-dispatchqueue-concurrentperfo +// +// DispatchQueue.concurrentPerform(iterations: 10) { index in +// +// let object = objs[i + index] +// +// // check for cached concrete type +// if let cached = object.cached { +// items.append(cached) +// return +// } +// +// // map to the plugin's intended type +// guard let mapped = self.map(object) else { +// return +// } +// +// // cache it +// object.cached = mapped +// +// // add to list +// items.append(mapped) +// +// } +// +// } +// +// return items +// +// } /// Returns all the objects within this region. Does NOT wait until the region is stable first. public func getAll() -> [Any] { - + // create array of all items var items: [Any] = [] for object in objects.values { @@ -390,9 +451,8 @@ public class Region { /// Returns an object within this region by it's ID. Waits until the region is stable first. public func getStable(id: String) -> Guarantee { - - // synchronize now - return self.synchronize().map { + + return self.synchronize().map { _ in // get item return self.get(id: id) } From 730a04a93b17b28b41ad3a462edd47b820f41cd1 Mon Sep 17 00:00:00 2001 From: Cameron McOnie Date: Thu, 30 May 2019 12:44:11 +0200 Subject: [PATCH 2/4] Add cache loaded notification --- BlockV/Core/Data Pool/Regions/Region+Notifications.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/BlockV/Core/Data Pool/Regions/Region+Notifications.swift b/BlockV/Core/Data Pool/Regions/Region+Notifications.swift index 6046f0a3..1d20f243 100644 --- a/BlockV/Core/Data Pool/Regions/Region+Notifications.swift +++ b/BlockV/Core/Data Pool/Regions/Region+Notifications.swift @@ -14,6 +14,9 @@ import Foundation /// Possible events public enum RegionEvent: String { + /// TRiggered when a cached version of the data is available. + case loadedFromCache = "region.loadedFromCache" + /// Triggered when any data in the region changes. This also indicates that there is no longer an error. case updated = "region.updated" From b751e596183bfa24131cece1dab0501d09a6f63c Mon Sep 17 00:00:00 2001 From: Cameron McOnie Date: Thu, 30 May 2019 12:44:45 +0200 Subject: [PATCH 3/4] Replace getAllStable with synchronise --- BlockV/Core/Data Pool/Helpers/Vatom+Containment.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BlockV/Core/Data Pool/Helpers/Vatom+Containment.swift b/BlockV/Core/Data Pool/Helpers/Vatom+Containment.swift index 7c7f0092..45e74232 100644 --- a/BlockV/Core/Data Pool/Helpers/Vatom+Containment.swift +++ b/BlockV/Core/Data Pool/Helpers/Vatom+Containment.swift @@ -33,7 +33,7 @@ extension VatomModel { if inventoryVatom == nil { // inspect the child region (owner & unowned) return DataPool.children(parentID: self.id) - .getAllStable() + .synchronize() .map { $0 as! [VatomModel] } // swiftlint:disable:this force_cast } else { From d78040aef2075ae8e997c1bab74075c1aea8e5e4 Mon Sep 17 00:00:00 2001 From: Cameron McOnie Date: Thu, 30 May 2019 12:46:03 +0200 Subject: [PATCH 4/4] Synchronise access to objects Add io queue to handle loading and saving cache Add worker queue to handle map Add sync queue to address reader-writer problem on objects --- BlockV/Core/Data Pool/Regions/Region.swift | 124 +++++++++++---------- 1 file changed, 66 insertions(+), 58 deletions(-) diff --git a/BlockV/Core/Data Pool/Regions/Region.swift b/BlockV/Core/Data Pool/Regions/Region.swift index 2f05a484..0e37da25 100644 --- a/BlockV/Core/Data Pool/Regions/Region.swift +++ b/BlockV/Core/Data Pool/Regions/Region.swift @@ -42,15 +42,36 @@ import PromiseKit /// - Persistance. public class Region { + // MARK: - Enums + enum RegionError: Error { case failedParsingResponse case failedParsingObject } - /// Serial io queue. + // MARK: - Dispatch Queues + + /// Serial i/o queue. /// /// Each subclass with have it's own io queue (the label does not dictate uniqueness). - let ioQueue = DispatchQueue(label: "io.blockv.sdk.datapool-io", qos: .utility) + let ioQueue = DispatchQueue(label: "io.blockv.sdk.datapool-io", + qos: .userInitiated) + + /// Concurrent worker queue. + /// + /// Use to process mappings and transfromations. + let workerQueue = DispatchQueue(label: "io.blockv.sdk.datapool-worker", + qos: .userInteractive, + attributes: .concurrent) + + /// Concurrent synchronization. + /// + /// Used exclusively for object synchronization queue (serial write, concurrent read). + let syncQueue = DispatchQueue(label: "io.blockv.sdk.object-sync", + qos: .userInitiated, + attributes: .concurrent) + + // MARK: - Initialization /// Constructor required init(descriptor: Any) throws { } @@ -63,13 +84,24 @@ public class Region { /// `true` if this region contains temporary objects which should not be cached to disk, `false` otherwise. let noCache = false + + private var _objects: [String: DataObject] = [:] /// All objects currently in our cache. - private(set) var objects: [String: DataObject] = [:] + private(set) var objects: [String: DataObject] { + get { // sync read (concurrent queue) + return syncQueue.sync { _objects } + } + set { + syncQueue.sync(flags: .barrier) { _objects = newValue } + } + } /// `true` if data in this region is in sync with the backend. public internal(set) var synchronized = false { didSet { + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) + if synchronized { self.emit(.stabalized, userInfo: [:]) } else { @@ -89,12 +121,12 @@ public class Region { /// `true` if this region has been closed. public fileprivate(set) var closed = false - - /// Concurrent worker queue. - let workerQueue = DispatchQueue(label: "io.blockv.sdk.datapool-worker", qos: .userInitiated, attributes: .concurrent) /// Re-synchronizes the region by manually fetching objects from the server again. public func forceSynchronize() -> Guarantee<[Any]> { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) + self.synchronized = false return self.synchronize() } @@ -108,9 +140,13 @@ public class Region { /// Attempts to stablaize the region by querying the backend for all data. /// + /// If the region is already stable, local data is returned. + /// /// - Returns: Promise which resolves when complete. @discardableResult public func synchronize() -> Guarantee<[Any]> { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) self.emit(.synchronizing, userInfo: [:]) @@ -191,6 +227,8 @@ public class Region { /// Stop and destroy this region. Subclasses can override this to do stuff on close. public func close() { + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) + // notify data pool we have closed DataPool.removeRegion(region: self) // we're closed @@ -213,6 +251,8 @@ public class Region { /// /// - Parameter objects: The objects to add func add(objects: [DataObject]) { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) // go through each object for obj in objects { @@ -244,10 +284,6 @@ public class Region { } - // emit event - //FIXME: Why was this being broadcast? -// self.emit(.objectUpdated, userInfo: ["id": obj.id]) - } // Notify updated @@ -262,6 +298,8 @@ public class Region { /// /// - Parameter objects: The list of changes to perform to our data objects. func update(objects: [DataObjectUpdateRecord]) { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) // batch emit events, so if a object is updated multiple times, only one event is sent var changedIDs = Set() @@ -331,6 +369,8 @@ public class Region { /// /// - Parameter ids: The IDs of objects to remove func remove(ids: [String]) { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) // remove all data objects with the specified IDs var didUpdate = false @@ -371,55 +411,13 @@ public class Region { return object } - /// Returns all the objects within this region. Waits until the region is stable first. - /// - /// - Returns: Array of objects. Check the region-specific map() function to see what types are returned. - public func getAllStable() -> Guarantee<[Any]> { //FIXME: This function is no longer needed, rather call `synchronize` directly. - - // synchronize now - return self.synchronize() - - } - -// public func getAllConcurrently() -> [Any] { -// -// // create array of all items -// var items: [Any] = [] -// -// let max = objects.count -// -// let objs = Array(objects.values) -// -// for i in stride(from: 0, to: max - 10, by: 10) { -// -// // wrong: https://stackoverflow.com/questions/54775099/is-it-possible-to-specify-the-dispatchqueue-for-dispatchqueue-concurrentperfo -// -// DispatchQueue.concurrentPerform(iterations: 10) { index in +// /// Returns all the objects within this region. Waits until the region is stable first. +// /// +// /// - Returns: Array of objects. Check the region-specific map() function to see what types are returned. +// public func getAllStable() -> Guarantee<[Any]> { //FIXME: This function is no longer needed, rather call `synchronize` directly. // -// let object = objs[i + index] -// -// // check for cached concrete type -// if let cached = object.cached { -// items.append(cached) -// return -// } -// -// // map to the plugin's intended type -// guard let mapped = self.map(object) else { -// return -// } -// -// // cache it -// object.cached = mapped -// -// // add to list -// items.append(mapped) -// -// } -// -// } -// -// return items +// // synchronize now +// return self.synchronize() // // } @@ -493,6 +491,8 @@ public class Region { /// Load objects from local storage. func loadFromCache() -> Promise { + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) + return Promise { (resolver: Resolver) in ioQueue.async { @@ -543,6 +543,8 @@ public class Region { DispatchQueue.main.async { // add objects self.add(objects: cleanObjects) + + self.emit(.loadedFromCache) } // done @@ -560,6 +562,8 @@ public class Region { /// Saves the region to local storage. func save() { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) // cancel the pending save task if saveTask != nil { @@ -624,6 +628,8 @@ public class Region { /// - value: The new value /// - Returns: An undo function func preemptiveChange(id: String, keyPath: String, value: Any) -> UndoFunction { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) // get object. If it doesn't exist, do nothing and return an undo function which does nothing. guard let object = objects[id], object.data != nil else { @@ -665,6 +671,8 @@ public class Region { /// - Parameter id: The object ID to remove /// - Returns: An undo function func preemptiveRemove(id: String) -> UndoFunction { + + dispatchPrecondition(condition: DispatchPredicate.onQueue(.main)) // remove object guard let removedObject = objects.removeValue(forKey: id) else {