// This source code is dual-licensed under the Mozilla Public License ("MPL"), // version 1.1 and the Apache License ("ASL"), version 2.0. // // The ASL v2.0: // // --------------------------------------------------------------------------- // Copyright 2016 Pivotal Software, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // --------------------------------------------------------------------------- // // The MPL v1.1: // // --------------------------------------------------------------------------- // The contents of this file are subject to the Mozilla Public License // Version 1.1 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // https://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is RabbitMQ // // The Initial Developer of the Original Code is Pivotal Software, Inc. // All Rights Reserved. // // Alternatively, the contents of this file may be used under the terms // of the Apache Standard license (the "ASL License"), in which case the // provisions of the ASL License are applicable instead of those // above. If you wish to allow use of your version of this file only // under the terms of the ASL License and not to allow others to use // your version of this file under the MPL, indicate your decision by // deleting the provisions above and replace them with the notice and // other provisions required by the ASL License. If you do not delete // the provisions above, a recipient may use your version of this file // under either the MPL or the ASL License. // --------------------------------------------------------------------------- #import "RMQTransactionalConfirmations.h" #import "RMQConfirmationTransaction.h" @interface RMQTransactionalConfirmations () @property (nonatomic, readwrite) NSUInteger offset; @property (nonatomic, readwrite) NSUInteger nextPublishSequenceNumber; @property (nonatomic, readwrite) NSMutableArray *transactions; @property (nonatomic, readwrite) NSUInteger transactionIndex; @property (nonatomic, readwrite) id delayQueue; @end @implementation RMQTransactionalConfirmations - (instancetype)initWithDelayQueue:(id)queue { self = [super init]; if (self) { self.offset = 0; self.nextPublishSequenceNumber = 0; self.transactions = [NSMutableArray new]; self.delayQueue = queue; [self addTransaction]; self.transactionIndex = 0; } return self; } - (instancetype)init { [self doesNotRecognizeSelector:_cmd]; return nil; } - (void)enable { self.nextPublishSequenceNumber = 1; } - (BOOL)isEnabled { return self.nextPublishSequenceNumber > 0; } - (void)recover { self.offset = self.nextPublishSequenceNumber - 1; } - (NSNumber *)addPublication { NSNumber *publicationSequenceNumber = @(self.nextPublishSequenceNumber); if (self.isEnabled) { [self.currentTransaction addUnconfirmed:publicationSequenceNumber]; self.nextPublishSequenceNumber++; } return publicationSequenceNumber; } - (void)addCallbackWithTimeout:(NSNumber *)timeoutInSecs callback:(RMQConfirmationCallback)callback { [self.currentTransaction setCallback:callback timeout:timeoutInSecs]; [self.currentTransaction completeIfReady]; [self addTransaction]; } - (void)ack:(RMQBasicAck *)ack { for (NSNumber *tag in [self offsetTags:ack]) { [[self transactionForUnconfirmedTag:tag] ack:tag]; } } - (void)nack:(RMQBasicNack *)nack { for (NSNumber *tag in [self offsetTags:nack]) { [[self transactionForUnconfirmedTag:tag] nack:tag]; } } #pragma mark - Private - (RMQConfirmationTransaction *)transactionForUnconfirmedTag:(NSNumber *)tag { NSPredicate *pred = [NSPredicate predicateWithBlock:^BOOL(RMQConfirmationTransaction *tx, NSDictionary * bindings) { return [tx isUnconfirmed:tag]; }]; NSArray *filtered = [self.transactions filteredArrayUsingPredicate:pred]; return filtered.count > 0 ? filtered[0] : nil; } - (RMQConfirmationTransaction *)currentTransaction { return self.transactions[self.transactionIndex]; } - (void)addTransaction { [self.transactions addObject:[[RMQConfirmationTransaction alloc] initWithDelayQueue:self.delayQueue]]; self.transactionIndex++; } - (NSArray *)offsetTags:(id)ackOrNack { RMQBasicAck *ack = (RMQBasicAck *)ackOrNack; uint64_t highestTag = ack.deliveryTag.integerValue; BOOL isMultiple = ack.options & RMQBasicAckMultiple; uint64_t lowestTag = isMultiple ? 1 : highestTag; NSMutableArray *tags = [NSMutableArray new]; for (uint64_t tag = lowestTag; tag <= highestTag; tag++) { [tags addObject:@(tag + self.offset)]; } return tags; } @end