// This source code is dual-licensed under the Mozilla Public License ("MPL"), // version 1.1 and the Apache License ("ASL"), version 2.0. // // The ASL v2.0: // // --------------------------------------------------------------------------- // Copyright 2016 Pivotal Software, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // --------------------------------------------------------------------------- // // The MPL v1.1: // // --------------------------------------------------------------------------- // The contents of this file are subject to the Mozilla Public License // Version 1.1 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // https://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is RabbitMQ // // The Initial Developer of the Original Code is Pivotal Software, Inc. // All Rights Reserved. // // Alternatively, the contents of this file may be used under the terms // of the Apache Standard license (the "ASL License"), in which case the // provisions of the ASL License are applicable instead of those // above. If you wish to allow use of your version of this file only // under the terms of the ASL License and not to allow others to use // your version of this file under the MPL, indicate your decision by // deleting the provisions above and replace them with the notice and // other provisions required by the ASL License. If you do not delete // the provisions above, a recipient may use your version of this file // under either the MPL or the ASL License. // --------------------------------------------------------------------------- #import "RMQConnection.h" #import "RMQConnectionRecover.h" #import "RMQGCDHeartbeatSender.h" #import "RMQGCDSerialQueue.h" #import "RMQHandshaker.h" #import "RMQMethods.h" #import "RMQMultipleChannelAllocator.h" #import "RMQProtocolHeader.h" #import "RMQQueuingConnectionDelegateProxy.h" #import "RMQReader.h" #import "RMQSemaphoreWaiterFactory.h" #import "RMQTCPSocketTransport.h" #import "RMQURI.h" #import "RMQTickingClock.h" #import "RMQTLSOptions.h" #import "RMQErrors.h" #import "RMQFrame.h" #import "RMQProcessInfoNameGenerator.h" NSInteger const RMQChannelLimit = 65535; @interface RMQConnection () @property (strong, nonatomic, readwrite) id transport; @property (nonatomic, readwrite) RMQReader *reader; @property (nonatomic, readwrite) id channelAllocator; @property (nonatomic, readwrite) id frameHandler; @property (nonatomic, readwrite) id commandQueue; @property (nonatomic, readwrite) id waiterFactory; @property (nonatomic, readwrite) id heartbeatSender; @property (nonatomic, weak, readwrite) id delegate; @property (nonatomic, readwrite) id channelZero; @property (nonatomic, readwrite) RMQConnectionConfig *config; @property (nonatomic, readwrite) NSMutableDictionary *userChannels; @property (nonatomic, readwrite) NSNumber *frameMax; @property (nonatomic, readwrite) BOOL handshakeComplete; @property (nonatomic, readwrite) NSNumber *handshakeTimeout; @end @implementation RMQConnection - (instancetype)initWithTransport:(id)transport config:(RMQConnectionConfig *)config handshakeTimeout:(NSNumber *)handshakeTimeout channelAllocator:(nonnull id)channelAllocator frameHandler:(nonnull id)frameHandler delegate:(id)delegate commandQueue:(nonnull id)commandQueue waiterFactory:(nonnull id)waiterFactory heartbeatSender:(nonnull id)heartbeatSender { self = [super init]; if (self) { self.config = config; self.handshakeComplete = NO; self.handshakeTimeout = handshakeTimeout; self.frameMax = config.frameMax; self.transport = transport; self.transport.delegate = self; self.channelAllocator = channelAllocator; self.channelAllocator.sender = self; self.frameHandler = frameHandler; self.reader = [[RMQReader alloc] initWithTransport:self.transport frameHandler:self]; self.userChannels = [NSMutableDictionary new]; self.delegate = delegate; self.commandQueue = commandQueue; self.waiterFactory = waiterFactory; self.heartbeatSender = heartbeatSender; self.channelZero = [self.channelAllocator allocate]; [self.channelZero activateWithDelegate:self.delegate]; } return self; } - (nonnull instancetype)initWithUri:(NSString *)uri tlsOptions:(RMQTLSOptions *)tlsOptions channelMax:(NSNumber *)channelMax frameMax:(NSNumber *)frameMax heartbeat:(NSNumber *)heartbeat syncTimeout:(NSNumber *)syncTimeout delegate:(id)delegate delegateQueue:(dispatch_queue_t)delegateQueue recoverAfter:(nonnull NSNumber *)recoveryInterval recoveryAttempts:(nonnull NSNumber *)recoveryAttempts recoverFromConnectionClose:(BOOL)shouldRecoverFromConnectionClose { NSError *error = NULL; RMQURI *rmqURI = [RMQURI parse:uri error:&error]; RMQTCPSocketTransport *transport = [[RMQTCPSocketTransport alloc] initWithHost:rmqURI.host port:rmqURI.portNumber tlsOptions:tlsOptions]; RMQMultipleChannelAllocator *allocator = [[RMQMultipleChannelAllocator alloc] initWithChannelSyncTimeout:syncTimeout]; RMQQueuingConnectionDelegateProxy *delegateProxy = [[RMQQueuingConnectionDelegateProxy alloc] initWithDelegate:delegate queue:delegateQueue]; RMQSemaphoreWaiterFactory *waiterFactory = [RMQSemaphoreWaiterFactory new]; RMQGCDHeartbeatSender *heartbeatSender = [[RMQGCDHeartbeatSender alloc] initWithTransport:transport clock:[RMQTickingClock new]]; RMQCredentials *credentials = [[RMQCredentials alloc] initWithUsername:rmqURI.username password:rmqURI.password]; RMQProcessInfoNameGenerator *nameGenerator = [RMQProcessInfoNameGenerator new]; RMQGCDSerialQueue *commandQueue = [[RMQGCDSerialQueue alloc] initWithName:[nameGenerator generateWithPrefix:@"connection-commands"]]; RMQConnectionRecover *recovery = [[RMQConnectionRecover alloc] initWithInterval:recoveryInterval attemptLimit:recoveryAttempts onlyErrors:!shouldRecoverFromConnectionClose heartbeatSender:heartbeatSender commandQueue:commandQueue delegate:delegateProxy]; RMQConnectionConfig *config = [[RMQConnectionConfig alloc] initWithCredentials:credentials channelMax:channelMax frameMax:frameMax heartbeat:heartbeat vhost:rmqURI.vhost authMechanism:tlsOptions.authMechanism recovery:recovery]; return [self initWithTransport:transport config:config handshakeTimeout:syncTimeout channelAllocator:allocator frameHandler:allocator delegate:delegateProxy commandQueue:commandQueue waiterFactory:waiterFactory heartbeatSender:heartbeatSender]; } - (instancetype)initWithUri:(NSString *)uri delegate:(id)delegate recoverAfter:(NSNumber *)recoveryInterval { return [self initWithUri:uri tlsOptions:[RMQTLSOptions fromURI:uri] channelMax:@(RMQChannelLimit) frameMax:@(RMQFrameMax) heartbeat:@0 syncTimeout:@10 delegate:delegate delegateQueue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0) recoverAfter:recoveryInterval recoveryAttempts:@(NSUIntegerMax) recoverFromConnectionClose:YES]; } - (instancetype)initWithUri:(NSString *)uri tlsOptions:(RMQTLSOptions *)tlsOptions channelMax:(NSNumber *)channelMax frameMax:(NSNumber *)frameMax heartbeat:(NSNumber *)heartbeat syncTimeout:(NSNumber *)syncTimeout delegate:(id)delegate delegateQueue:(dispatch_queue_t)delegateQueue { return [self initWithUri:uri tlsOptions:tlsOptions channelMax:channelMax frameMax:frameMax heartbeat:heartbeat syncTimeout:syncTimeout delegate:delegate delegateQueue:delegateQueue recoverAfter:@4 recoveryAttempts:@(NSUIntegerMax) recoverFromConnectionClose:YES]; } - (instancetype)initWithUri:(NSString *)uri channelMax:(NSNumber *)channelMax frameMax:(NSNumber *)frameMax heartbeat:(NSNumber *)heartbeat syncTimeout:(NSNumber *)syncTimeout delegate:(id)delegate delegateQueue:(dispatch_queue_t)delegateQueue { return [self initWithUri:uri tlsOptions:[RMQTLSOptions fromURI:uri] channelMax:channelMax frameMax:frameMax heartbeat:heartbeat syncTimeout:syncTimeout delegate:delegate delegateQueue:delegateQueue]; } - (instancetype)initWithUri:(NSString *)uri tlsOptions:(RMQTLSOptions *)tlsOptions delegate:(id)delegate { return [self initWithUri:uri tlsOptions:tlsOptions channelMax:@(RMQChannelLimit) frameMax:@(RMQFrameMax) heartbeat:@0 syncTimeout:@10 delegate:delegate delegateQueue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)]; } - (instancetype)initWithUri:(NSString *)uri verifyPeer:(BOOL)verifyPeer delegate:(id)delegate { RMQTLSOptions *tlsOptions = [RMQTLSOptions fromURI:uri verifyPeer:verifyPeer]; return [self initWithUri:uri tlsOptions:tlsOptions delegate:delegate]; } - (instancetype)initWithUri:(NSString *)uri delegate:(id)delegate { return [self initWithUri:uri tlsOptions:[RMQTLSOptions fromURI:uri] delegate:delegate]; } - (instancetype)initWithDelegate:(id)delegate { return [self initWithUri:@"amqp://guest:guest@localhost" delegate:delegate]; } - (instancetype)init { return [self initWithDelegate:nil]; } - (void)start:(void (^)())completionHandler { NSError *connectError = NULL; [self.transport connectAndReturnError:&connectError]; if (connectError) { [self.delegate connection:self failedToConnectWithError:connectError]; } else { [self.transport write:[RMQProtocolHeader new].amqEncoded]; [self.commandQueue enqueue:^{ id handshakeCompletion = [self.waiterFactory makeWithTimeout:self.handshakeTimeout]; RMQHandshaker *handshaker = [[RMQHandshaker alloc] initWithSender:self config:self.config completionHandler:^(NSNumber *heartbeatTimeout) { [self.heartbeatSender startWithInterval:@(heartbeatTimeout.integerValue / 2)]; self.handshakeComplete = YES; [handshakeCompletion done]; [self.reader run]; completionHandler(); }]; RMQReader *handshakeReader = [[RMQReader alloc] initWithTransport:self.transport frameHandler:handshaker]; handshaker.reader = handshakeReader; [handshakeReader run]; if (handshakeCompletion.timesOut) { NSError *error = [NSError errorWithDomain:RMQErrorDomain code:RMQErrorConnectionHandshakeTimedOut userInfo:@{NSLocalizedDescriptionKey: @"Handshake timed out."}]; [self.delegate connection:self failedToConnectWithError:error]; } }]; } } - (void)start { [self start:^{}]; } - (id)createChannel { id ch = self.channelAllocator.allocate; self.userChannels[ch.channelNumber] = ch; [self.commandQueue enqueue:^{ [ch activateWithDelegate:self.delegate]; }]; [ch open]; return ch; } - (void)close { for (RMQOperation operation in self.closeOperations) { [self.commandQueue enqueue:operation]; } } - (void)blockingClose { for (RMQOperation operation in self.closeOperations) { [self.commandQueue blockingEnqueue:operation]; } } # pragma mark - RMQSender - (void)sendFrameset:(RMQFrameset *)frameset force:(BOOL)isForced { if (self.handshakeComplete || isForced) { [self.transport write:frameset.amqEncoded]; [self.heartbeatSender signalActivity]; } } - (void)sendFrameset:(RMQFrameset *)frameset { [self sendFrameset:frameset force:NO]; } # pragma mark - RMQFrameHandler - (void)handleFrameset:(RMQFrameset *)frameset { id method = frameset.method; if ([method isKindOfClass:[RMQConnectionClose class]]) { [self sendFrameset:[[RMQFrameset alloc] initWithChannelNumber:@0 method:[RMQConnectionCloseOk new]]]; self.handshakeComplete = NO; [self.transport close]; self.transport.delegate = self; } else { [self.frameHandler handleFrameset:frameset]; [self.reader run]; } } # pragma mark - RMQTransportDelegate - (void)transport:(id)transport disconnectedWithError:(NSError *)error { self.handshakeComplete = NO; if (error) [self.delegate connection:self disconnectedWithError:error]; [self.recovery recover:self channelAllocator:self.channelAllocator error:error]; } # pragma mark - Private - (NSArray *)closeOperations { return @[^{[self closeAllUserChannels];}, ^{[self sendFrameset:[[RMQFrameset alloc] initWithChannelNumber:@0 method:self.amqClose]];}, ^{[self.channelZero blockingWaitOn:[RMQConnectionCloseOk class]];}, ^{[self.heartbeatSender stop];}, ^{ self.transport.delegate = nil; [self.transport close]; }]; } - (void)closeAllUserChannels { for (id ch in self.userChannels.allValues) { [ch blockingClose]; } } - (RMQConnectionClose *)amqClose { return [[RMQConnectionClose alloc] initWithReplyCode:[[RMQShort alloc] init:200] replyText:[[RMQShortstr alloc] init:@"Goodbye"] classId:[[RMQShort alloc] init:0] methodId:[[RMQShort alloc] init:0]]; } - (id)recovery { return self.config.recovery; } @end