libsyncml  0.5.4
sml_queue.c
1 /*
2  * libsyncml - A syncml protocol implementation
3  * Copyright (C) 2005 Armin Bauer <armin.bauer@opensync.org>
4  * Copyright (C) 2009 Michael Bell <michael.bell@opensync.org>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  */
21 
22 #include "syncml.h"
23 
24 #include "syncml_internals.h"
25 #include "sml_queue_internals.h"
26 #include "sml_error_internals.h"
27 
28 #ifndef DOXYGEN_SHOULD_SKIP_THIS
29 
30 static gboolean _queue_prepare(GSource *source, gint *timeout_)
31 {
32  smlTrace(TRACE_INTERNAL, "%s(%p, %p)", __func__, source, timeout_);
33  *timeout_ = 1;
34  return FALSE;
35 }
36 
37 /* TODO only use me for debugging */
38 void smlQueueAssert(SmlQueue *queue)
39 {
40  if (queue->tail) {
41  smlAssert(queue->head);
42  }
43  if (queue->prio) {
44  smlAssert(queue->head);
45  }
46  if (queue->head) {
47  smlAssert(queue->tail);
48  }
49 
50  if (g_list_length(queue->head) == 1) {
51  smlAssert(queue->tail == queue->head);
52  }
53 
54  smlAssert(g_list_last(queue->head) == queue->tail);
55 }
56 
57 static gboolean _queue_check(GSource *source)
58 {
59  SmlQueue *queue = *((SmlQueue **)(source + 1));
60  return smlQueueCheck(queue);
61 }
62 
63 static gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
64 {
65  smlTrace(TRACE_INTERNAL, "%s(%p, %p, %p)", __func__, source, callback, user_data);
66  SmlQueue *queue = user_data;
67  smlAssert(queue->handler);
68 
69  while (smlQueueCheck(queue))
70  smlQueueDispatch(queue);
71 
72  return TRUE;
73 }
74 #endif /* DOXYGEN_SHOULD_SKIP_THIS */
75 
82 {
83  CHECK_ERROR_REF
84  SmlQueue *queue = smlTryMalloc0(sizeof(SmlQueue), error);
85  if (!queue)
86  return NULL;
87 
88  if (!g_thread_supported ()) g_thread_init (NULL);
89 
90  queue->mutex = g_mutex_new();
91 
92  return queue;
93 }
94 
95 void smlQueueFree(SmlQueue *queue)
96 {
97  if (queue->source)
98  smlQueueDetach(queue);
99 
100  if (queue->head)
101  g_list_free(queue->head);
102 
103  g_mutex_free(queue->mutex);
104 
105  smlSafeFree((gpointer *)&queue);
106 }
107 
108 void smlQueueFlush(SmlQueue *queue)
109 {
110  g_mutex_lock(queue->mutex);
111 
112  smlQueueAssert(queue);
113 
114  queue->tail = NULL;
115  g_list_free(queue->head);
116  queue->head = NULL;
117  queue->prio = NULL;
118 
119  smlQueueAssert(queue);
120 
121  g_mutex_unlock(queue->mutex);
122 }
123 
124 SmlBool smlQueueCheckPrio(SmlQueue *queue)
125 {
126  g_mutex_lock(queue->mutex);
127  SmlBool ret = (queue->prio == NULL) ? FALSE : TRUE;
128  g_mutex_unlock(queue->mutex);
129 
130  return ret;
131 }
132 
133 SmlBool smlQueueCheck(SmlQueue *queue)
134 {
135  g_mutex_lock(queue->mutex);
136  SmlBool ret = (queue->head == NULL) ? FALSE : TRUE;
137  g_mutex_unlock(queue->mutex);
138 
139  return ret;
140 }
141 
142 unsigned int smlQueueLength(SmlQueue *queue)
143 {
144  g_mutex_lock(queue->mutex);
145  unsigned int ret = g_list_length(queue->head);
146  g_mutex_unlock(queue->mutex);
147 
148  return ret;
149 }
150 
151 unsigned int smlQueueLengthPrio(SmlQueue *queue)
152 {
153  g_mutex_lock(queue->mutex);
154  unsigned int ret = g_list_length(queue->prio);
155  g_mutex_unlock(queue->mutex);
156 
157  return ret;
158 }
159 
160 void *smlQueueTryPop(SmlQueue *queue)
161 {
162  smlAssert(queue);
163  void *message = NULL;
164 
165  g_mutex_lock(queue->mutex);
166 
167  smlQueueAssert(queue);
168 
169  if (queue->head) {
170  message = queue->head->data;
171  if (queue->head == queue->tail)
172  queue->tail = NULL;
173  if (queue->prio && message == queue->prio->data)
174  queue->prio = g_list_delete_link(queue->prio, queue->prio);
175  queue->head = g_list_delete_link(queue->head, queue->head);
176  }
177 
178  smlQueueAssert(queue);
179 
180  g_mutex_unlock(queue->mutex);
181 
182  return message;
183 }
184 
185 void *smlQueueTryPopPrio(SmlQueue *queue)
186 {
187  smlAssert(queue);
188  void *message = NULL;
189 
190  g_mutex_lock(queue->mutex);
191 
192  smlQueueAssert(queue);
193 
194  message = queue->prio ? queue->prio->data : NULL;
195  queue->prio = g_list_delete_link(queue->prio, queue->prio);
196 
197  if (message) {
198  queue->head = g_list_remove(queue->head, message);
199  queue->tail = g_list_last(queue->head);
200  }
201 
202  smlQueueAssert(queue);
203 
204  g_mutex_unlock(queue->mutex);
205 
206  return message;
207 }
208 
209 void smlQueueLock(SmlQueue *queue)
210 {
211  smlAssert(queue);
212  g_mutex_lock(queue->mutex);
213 }
214 
215 void smlQueueUnlock(SmlQueue *queue)
216 {
217  smlAssert(queue);
218  g_mutex_unlock(queue->mutex);
219 }
220 
221 void *smlQueuePeek(SmlQueue *queue)
222 {
223  smlAssert(queue);
224  return queue->head ? queue->head->data : NULL;
225 }
226 
227 void *smlQueuePeekPrio(SmlQueue *queue)
228 {
229  smlAssert(queue);
230  void *message = NULL;
231 
232  g_mutex_lock(queue->mutex);
233 
234  message = queue->prio ? queue->prio->data : NULL;
235 
236  smlQueueAssert(queue);
237 
238  g_mutex_unlock(queue->mutex);
239 
240  return message;
241 }
242 
243 void smlQueuePrint(SmlQueue *queue)
244 {
245  smlAssert(queue);
246 
247  g_mutex_lock(queue->mutex);
248 
249  GString *info = g_string_new("Contents of queue ");
250  g_string_append_printf(info, "%p:", queue);
251 
252  GList *m = NULL;
253  for (m = queue->head; m; m = m->next) {
254  g_string_append_printf(info, ", %p (list %p)", m->data, m);
255  }
256  smlTrace(TRACE_INTERNAL, "%s: %s", __func__, VA_STRING(info->str));
257  g_string_free(info, TRUE);
258 
259  info = g_string_new("Contents of prio queue:");
260  for (m = queue->prio; m; m = m->next) {
261  g_string_append_printf(info, ", %p (list %p)", m->data, m);
262  }
263  smlTrace(TRACE_INTERNAL, "%s: %s", __func__, VA_STRING(info->str));
264  g_string_free(info, TRUE);
265  smlTrace(TRACE_INTERNAL, "%s: Tail of queue: %p (list %p)", __func__, queue->tail ? queue->tail->data : NULL, queue->tail);
266 
267  g_mutex_unlock(queue->mutex);
268 }
269 
270 GList *smlQueuePeekNext(SmlQueue *queue, GList *prev)
271 {
272  smlAssert(queue);
273 
274  if (!prev)
275  return queue->head;
276  else
277  return prev->next;
278 }
279 
280 void smlQueuePushHeadPrio(SmlQueue *queue, void *data)
281 {
282  smlAssert(queue);
283  smlAssert(data);
284 
285  g_mutex_lock(queue->mutex);
286 
287  smlQueueAssert(queue);
288 
289  queue->head = g_list_prepend(queue->head, data);
290  if (!queue->tail)
291  queue->tail = queue->head;
292  queue->prio = g_list_prepend(queue->prio, data);
293 
294  smlQueueAssert(queue);
295 
296  g_mutex_unlock(queue->mutex);
297 }
298 
299 void smlQueuePushHead(SmlQueue *queue, void *data)
300 {
301  smlAssert(queue);
302  smlAssert(data);
303 
304  g_mutex_lock(queue->mutex);
305 
306  smlQueueAssert(queue);
307 
308  queue->head = g_list_prepend(queue->head, data);
309  if (!queue->tail)
310  queue->tail = queue->head;
311 
312  smlQueueAssert(queue);
313 
314  g_mutex_unlock(queue->mutex);
315 }
316 
323 void smlQueueSendPrio(SmlQueue *queue, void *data)
324 {
325  smlAssert(queue);
326  smlAssert(data);
327 
328  g_mutex_lock(queue->mutex);
329 
330  smlQueueAssert(queue);
331 
332  if (queue->tail) {
333  queue->tail = g_list_append(queue->tail, data);
334  queue->tail = queue->tail->next;
335  } else {
336  queue->head = g_list_append(queue->head, data);
337  queue->tail = queue->head;
338  }
339  queue->prio = g_list_append(queue->prio, data);
340 
341  smlQueueAssert(queue);
342 
343  g_mutex_unlock(queue->mutex);
344 }
345 
346 
353 void smlQueueSend(SmlQueue *queue, void *data)
354 {
355  smlAssert(queue);
356  smlAssert(data);
357 
358  g_mutex_lock(queue->mutex);
359 
360  smlQueueAssert(queue);
361 
362  if (queue->tail) {
363  queue->tail = g_list_append(queue->tail, data);
364  queue->tail = queue->tail->next;
365  } else {
366  queue->head = g_list_append(queue->head, data);
367  queue->tail = queue->head;
368  }
369 
370  smlQueueAssert(queue);
371 
372  g_mutex_unlock(queue->mutex);
373 }
374 
384 void smlQueueSetHandler(SmlQueue *queue, SmlQueueHandler handler, void *userdata)
385 {
386  queue->handler = handler;
387  queue->userdata = userdata;
388 }
389 
400 void smlQueueAttach(SmlQueue *queue, GMainContext *context)
401 {
402  smlTrace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context);
403 
404  smlAssert(queue);
405  smlAssert(queue->source == NULL);
406 
407  GSourceFuncs *functions = g_malloc0(sizeof(GSourceFuncs));
408  functions->prepare = _queue_prepare;
409  functions->check = _queue_check;
410  functions->dispatch = _queue_dispatch;
411  functions->finalize = NULL;
412 
413  GSource *source = g_source_new(functions, sizeof(GSource) + sizeof(SmlQueue *));
414  SmlQueue **queueptr = (SmlQueue **)(source + 1);
415  *queueptr = queue;
416  g_source_set_callback(source, NULL, queue, NULL);
417  queue->source = source;
418  queue->functions = functions;
419  g_source_attach(source, context);
420  queue->context = context;
421  if (context)
422  g_main_context_ref(context);
423 
424  smlTrace(TRACE_EXIT, "%s", __func__);
425 }
426 
427 void smlQueueDetach(SmlQueue *queue)
428 {
429  smlTrace(TRACE_ENTRY, "%s(%p)", __func__, queue);
430 
431  smlAssert(queue);
432  smlAssert(queue->source);
433 
434  g_source_destroy(queue->source);
435  g_source_unref(queue->source);
436 
437  smlSafeFree((gpointer *)&(queue->functions));
438 
439  queue->source = NULL;
440 
441  if (queue->context) {
442  g_main_context_unref(queue->context);
443  queue->context = NULL;
444  }
445 
446  smlTrace(TRACE_EXIT, "%s", __func__);
447 }
448 
449 SmlBool smlQueueIsAttached(SmlQueue *queue)
450 {
451  return queue->source ? TRUE : FALSE;
452 }
453 
454 void smlQueueDispatch(SmlQueue *queue)
455 {
456  void *message = smlQueueTryPop(queue);
457  if (message)
458  queue->handler(message, queue->userdata);
459 }
460 
461 void smlQueueDispatchPrio(SmlQueue *queue)
462 {
463 
464  void *message = smlQueueTryPopPrio(queue);
465  if (message)
466  queue->handler(message, queue->userdata);
467 }
468 
SmlQueueHandler handler
void smlQueueSend(SmlQueue *queue, void *data)
Sends a message down a queue.
Definition: sml_queue.c:353
void smlQueueSetHandler(SmlQueue *queue, SmlQueueHandler handler, void *userdata)
Sets the message handler for a queue.
Definition: sml_queue.c:384
SmlQueue * smlQueueNew(SmlError **error)
Creates a new asynchronous queue.
Definition: sml_queue.c:81
GSource * source
Represents a Queue which can be used to receive messages.
GMainContext * context
void smlTrace(SmlTraceType type, const char *message,...)
Used for tracing the application.
Definition: sml_support.c:120
void smlQueueAttach(SmlQueue *queue, GMainContext *context)
Sets the queue to use the gmainloop with the given context.
Definition: sml_queue.c:400
void * smlTryMalloc0(long n_bytes, SmlError **error)
Safely mallocs.
Definition: sml_support.c:335
Represent an error.
void smlQueueSendPrio(SmlQueue *queue, void *data)
Sends a message down a queue.
Definition: sml_queue.c:323