1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """
17 Contains AMQP message producer classes.
18 """
19
20 from pulp.messaging import *
21 from pulp.messaging.endpoint import Endpoint
22 from qpid.messaging import Message
23 from logging import getLogger
24
25 log = getLogger(__name__)
26
27
29 """
30 An AMQP (abstract) message producer.
31 """
32
34 """
35 Open and configure the producer.
36 """
37 pass
38
39 - def send(self, destination, **body):
40 """
41 Send a message.
42 @param destination: An AMQP destination.
43 @type destination: L{Destination}
44 @keyword body: envelope body.
45 @return: The message serial number.
46 @rtype: str
47 """
48 sn = getuuid()
49 envelope = Envelope(sn=sn, version=version, origin=self.id())
50 envelope.update(body)
51 message = Message(content=envelope.dump(), durable=True)
52 address = str(destination)
53 sender = self.session().sender(address)
54 sender.send(message);
55 log.info('{%s} sent (%s)\n%s', self.id(), address, envelope)
56 return sn
57
59 """
60 Broadcast a message to (N) queues.
61 @param destinations: A list of AMQP destinations.
62 @type destinations: [L{Destination},..]
63 @keyword body: envelope body.
64 @return: A list of (addr,sn).
65 @rtype: list
66 """
67 sns = []
68 for dst in destinations:
69 sn = Producer.send(self, str(dst), **body)
70 sns.append((repr(dst),sn))
71 return sns
72
73
75 """
76 Event producer.
77 """
78
79 - def send(self, subject, event):
80 """
81 Send an event.
82 @param subject: A subject.
83 @type subject: str
84 @param event: The event body
85 @type event: object
86 """
87 destination = Topic('event', subject)
88 Producer.send(self, destination, event=event)
89