// This source code is dual-licensed under the Mozilla Public License ("MPL"), // version 1.1 and the Apache License ("ASL"), version 2.0. // // The ASL v2.0: // // --------------------------------------------------------------------------- // Copyright 2016 Pivotal Software, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // --------------------------------------------------------------------------- // // The MPL v1.1: // // --------------------------------------------------------------------------- // The contents of this file are subject to the Mozilla Public License // Version 1.1 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // https://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is RabbitMQ // // The Initial Developer of the Original Code is Pivotal Software, Inc. // All Rights Reserved. // // Alternatively, the contents of this file may be used under the terms // of the Apache Standard license (the "ASL License"), in which case the // provisions of the ASL License are applicable instead of those // above. If you wish to allow use of your version of this file only // under the terms of the ASL License and not to allow others to use // your version of this file under the MPL, indicate your decision by // deleting the provisions above and replace them with the notice and // other provisions required by the ASL License. If you do not delete // the provisions above, a recipient may use your version of this file // under either the MPL or the ASL License. // --------------------------------------------------------------------------- #import "RMQSuspendResumeDispatcher.h" #import "RMQErrors.h" typedef NS_ENUM(NSUInteger, DispatcherState) { DispatcherStateOpen = 1, DispatcherStateClosedByClient, DispatcherStateClosedByServer, }; @interface RMQSuspendResumeDispatcher () @property (nonatomic, readwrite) id channel; @property (nonatomic, readwrite) id sender; @property (nonatomic, readwrite) RMQFramesetValidator *validator; @property (nonatomic, readwrite) id commandQueue; @property (nonatomic, readwrite) id enablementQueue; @property (nonatomic, readwrite) NSNumber *enableDelay; @property (nonatomic, readwrite) id delegate; @property (nonatomic, readwrite) DispatcherState state; @property (nonatomic, readwrite) BOOL disabled; @end @implementation RMQSuspendResumeDispatcher - (instancetype)initWithSender:(id)sender commandQueue:(id)commandQueue enablementQueue:(id)enablementQueue enableDelay:(NSNumber *)enableDelay { self = [super init]; if (self) { self.channel = nil; self.sender = sender; self.validator = [RMQFramesetValidator new]; self.commandQueue = commandQueue; self.enablementQueue = enablementQueue; self.enableDelay = enableDelay; self.state = DispatcherStateOpen; self.disabled = NO; } return self; } - (instancetype)initWithSender:(id)sender commandQueue:(id)commandQueue { return [self initWithSender:sender commandQueue:commandQueue enablementQueue:nil enableDelay:@0]; } - (void)activateWithChannel:(id)channel delegate:(id)delegate { self.channel = channel; self.delegate = delegate; [self.commandQueue resume]; } - (void)blockingWaitOn:(Class)method { [self.commandQueue blockingEnqueue:^{ [self processOutgoing:nil executeOrErr:^{ [self.commandQueue suspend]; }]; }]; [self.commandQueue blockingEnqueue:^{ RMQFramesetValidationResult *result = [self.validator expect:method]; if (result.error) { [self.delegate channel:self.channel error:result.error]; } }]; } - (void)sendSyncMethod:(id)method completionHandler:(void (^)(RMQFrameset *frameset))completionHandler { [self.commandQueue enqueue:^{ [self processOutgoing:method executeOrErr:^{ if ([self isClose:method]) { [self processClientClose]; } RMQFrameset *outgoingFrameset = [[RMQFrameset alloc] initWithChannelNumber:self.channelNumber method:method]; [self.commandQueue suspend]; [self.sender sendFrameset:outgoingFrameset]; }]; }]; [self.commandQueue enqueue:^{ RMQFramesetValidationResult *result = [self.validator expect:method.syncResponse]; if (self.channelIsOpen && result.error) { [self.delegate channel:self.channel error:result.error]; } else if (self.channelIsOpen) { completionHandler(result.frameset); } }]; } - (void)sendSyncMethod:(id)method { [self sendSyncMethod:method completionHandler:^(RMQFrameset *frameset) {}]; } - (void)sendSyncMethodBlocking:(id)method { [self.commandQueue blockingEnqueue:^{ [self processOutgoing:method executeOrErr:^{ RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:self.channelNumber method:method]; [self.commandQueue suspend]; [self.sender sendFrameset:frameset]; }]; }]; [self.commandQueue blockingEnqueue:^{ RMQFramesetValidationResult *result = [self.validator expect:method.syncResponse]; if (self.channelIsOpen && result.error) { [self.delegate channel:self.channel error:result.error]; } }]; } - (void)sendAsyncFrameset:(RMQFrameset *)frameset { [self.commandQueue enqueue:^{ [self processOutgoing:frameset.method executeOrErr:^{ [self.sender sendFrameset:frameset]; }]; }]; } - (void)sendAsyncMethod:(id)method { [self sendAsyncFrameset:[[RMQFrameset alloc] initWithChannelNumber:self.channelNumber method:method]]; } - (void)enqueue:(RMQOperation)operation { [self.commandQueue enqueue:operation]; } - (void)disable { self.disabled = YES; [self.commandQueue suspend]; } - (void)enable { [self.enablementQueue delayedBy:self.enableDelay enqueue:^{ self.disabled = NO; [self.commandQueue resume]; }]; } - (void)handleFrameset:(RMQFrameset *)frameset { if (!self.channelAlreadyClosedByServer && [self isClose:frameset.method]) { [self processServerClose:(RMQChannelClose *)frameset.method]; } else if (self.channelIsOpen) { [self.validator fulfill:frameset]; } if (!self.disabled) { [self.commandQueue resume]; } } # pragma mark - Private - (void)processOutgoing:(id)method executeOrErr:(void (^)())operation { if (self.channelIsOpen) { operation(); } else if (![self isClose:method]) { [self sendChannelClosedError]; } } - (void)processClientClose { self.state = DispatcherStateClosedByClient; } - (void)processServerClose:(RMQChannelClose *)close { self.state = DispatcherStateClosedByServer; NSError *error = [NSError errorWithDomain:RMQErrorDomain code:close.replyCode.integerValue userInfo:@{NSLocalizedDescriptionKey: close.replyText.stringValue}]; [self.delegate channel:self.channel error:error]; [self.sender sendFrameset:[[RMQFrameset alloc] initWithChannelNumber:self.channelNumber method:[RMQChannelCloseOk new]]]; } - (void)sendChannelClosedError { NSError *error = [NSError errorWithDomain:RMQErrorDomain code:RMQErrorChannelClosed userInfo:@{NSLocalizedDescriptionKey: @"Cannot use channel after it has been closed."}]; [self.delegate channel:self.channel error:error]; } - (BOOL)channelIsOpen { return self.state == DispatcherStateOpen; } - (BOOL)channelAlreadyClosedByServer { return self.state == DispatcherStateClosedByServer; } - (BOOL)isClose:(id)method { return [method isKindOfClass:[RMQChannelClose class]]; } - (NSNumber *)channelNumber { return self.channel.channelNumber; } @end