单军华
2016-12-09 1a11fb042ac01b8c045d48e6ee3abbf153bf1c36
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//
//  ViewController.m
//  RabbitMQ
//
//  Created by WindShan on 2016/12/9.
//  Copyright © 2016年 WindShan. All rights reserved.
//
 
#import "ViewController.h"
#import "RMQClient.h"
 
@interface ViewController ()
 
@end
 
@implementation ViewController
 
- (void)viewDidLoad
{
    [super viewDidLoad];
    
    [self workerNamed:@"Jack"];
    [self workerNamed:@"Jill"];
    sleep(1);
    [self newTask:@"Hello World..."];
    [self newTask:@"Just one this time."];
    [self newTask:@"Five....."];
    [self newTask:@"None"];
    [self newTask:@"Two..dots"];
    // Do any additional setup after loading the view, typically from a nib.
}
 
 
- (void)didReceiveMemoryWarning {
    [super didReceiveMemoryWarning];
    // Dispose of any resources that can be recreated.
}
 
- (void)newTask:(NSString *)msg
{
    NSLog(@"Attempting to connect to local RabbitMQ broker");
 
    RMQConnection *conn = [[RMQConnection alloc] initWithUri:@"amqp://myrabbitserver.com:1234"
                                                    delegate:[RMQConnectionDelegateLogger new]];
    //RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
    [conn start];
    
    id<RMQChannel> ch = [conn createChannel];
    
    RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];
    
    NSData *msgData = [msg dataUsingEncoding:NSUTF8StringEncoding];
    [ch.defaultExchange publish:msgData routingKey:q.name persistent:YES];
    NSLog(@"Sent %@", msg);
    
    [conn close];
}
 
- (void)workerNamed:(NSString *)name
{
    RMQConnection *conn = [[RMQConnection alloc] initWithUri:@"amqp://myrabbitserver.com:1234"
                                                    delegate:[RMQConnectionDelegateLogger new]];
 
    //RMQConnection *conn = [[RMQConnection alloc] initWithDelegate:[RMQConnectionDelegateLogger new]];
    [conn start];
    
    id<RMQChannel> ch = [conn createChannel];
        
    RMQQueue *q = [ch queue:@"task_queue" options:RMQQueueDeclareDurable];
    
    [ch basicQos:@1 global:NO];
    NSLog(@"%@: Waiting for messages", name);
    
    RMQBasicConsumeOptions manualAck = RMQBasicConsumeNoOptions;
    [q subscribe:manualAck handler:^(RMQMessage * _Nonnull message)
    {
        NSString *messageText = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];
        NSLog(@"%@: Received %@", name, messageText);
        // imitate some work
        unsigned int sleepTime = (unsigned int)[messageText componentsSeparatedByString:@"."].count - 1;
        NSLog(@"%@: Sleeping for %u seconds", name, sleepTime);
        sleep(sleepTime);
        
        [ch ack:message.deliveryTag];
    }];
}
 
@end