]> Pileus Git - ~andy/linux/blob - drivers/staging/lustre/lustre/llite/xattr_cache.c
Merge branch 'merge' of git://git.kernel.org/pub/scm/linux/kernel/git/benh/powerpc
[~andy/linux] / drivers / staging / lustre / lustre / llite / xattr_cache.c
1 /*
2  * Copyright 2012 Xyratex Technology Limited
3  *
4  * Author: Andrew Perepechko <Andrew_Perepechko@xyratex.com>
5  *
6  */
7
8 #define DEBUG_SUBSYSTEM S_LLITE
9
10 #include <linux/fs.h>
11 #include <linux/sched.h>
12 #include <linux/mm.h>
13 #include <obd_support.h>
14 #include <lustre_lite.h>
15 #include <lustre_dlm.h>
16 #include <lustre_ver.h>
17 #include "llite_internal.h"
18
19 /* If we ever have hundreds of extended attributes, we might want to consider
20  * using a hash or a tree structure instead of list for faster lookups.
21  */
22 struct ll_xattr_entry {
23         struct list_head        xe_list;    /* protected with
24                                              * lli_xattrs_list_rwsem */
25         char                    *xe_name;   /* xattr name, \0-terminated */
26         char                    *xe_value;  /* xattr value */
27         unsigned                xe_namelen; /* strlen(xe_name) + 1 */
28         unsigned                xe_vallen;  /* xattr value length */
29 };
30
31 static struct kmem_cache *xattr_kmem;
32 static struct lu_kmem_descr xattr_caches[] = {
33         {
34                 .ckd_cache = &xattr_kmem,
35                 .ckd_name  = "xattr_kmem",
36                 .ckd_size  = sizeof(struct ll_xattr_entry)
37         },
38         {
39                 .ckd_cache = NULL
40         }
41 };
42
43 int ll_xattr_init(void)
44 {
45         return lu_kmem_init(xattr_caches);
46 }
47
48 void ll_xattr_fini(void)
49 {
50         lu_kmem_fini(xattr_caches);
51 }
52
53 /**
54  * Initializes xattr cache for an inode.
55  *
56  * This initializes the xattr list and marks cache presence.
57  */
58 static void ll_xattr_cache_init(struct ll_inode_info *lli)
59 {
60
61
62         LASSERT(lli != NULL);
63
64         INIT_LIST_HEAD(&lli->lli_xattrs);
65         lli->lli_flags |= LLIF_XATTR_CACHE;
66 }
67
68 /**
69  *  This looks for a specific extended attribute.
70  *
71  *  Find in @cache and return @xattr_name attribute in @xattr,
72  *  for the NULL @xattr_name return the first cached @xattr.
73  *
74  *  \retval 0        success
75  *  \retval -ENODATA if not found
76  */
77 static int ll_xattr_cache_find(struct list_head *cache,
78                                const char *xattr_name,
79                                struct ll_xattr_entry **xattr)
80 {
81         struct ll_xattr_entry *entry;
82
83
84
85         list_for_each_entry(entry, cache, xe_list) {
86                 /* xattr_name == NULL means look for any entry */
87                 if (xattr_name == NULL ||
88                     strcmp(xattr_name, entry->xe_name) == 0) {
89                         *xattr = entry;
90                         CDEBUG(D_CACHE, "find: [%s]=%.*s\n",
91                                entry->xe_name, entry->xe_vallen,
92                                entry->xe_value);
93                         return 0;
94                 }
95         }
96
97         return -ENODATA;
98 }
99
100 /**
101  * This adds or updates an xattr.
102  *
103  * Add @xattr_name attr with @xattr_val value and @xattr_val_len length,
104  * if the attribute already exists, then update its value.
105  *
106  * \retval 0       success
107  * \retval -ENOMEM if no memory could be allocated for the cached attr
108  */
109 static int ll_xattr_cache_add(struct list_head *cache,
110                               const char *xattr_name,
111                               const char *xattr_val,
112                               unsigned xattr_val_len)
113 {
114         struct ll_xattr_entry *xattr;
115
116
117
118         if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) {
119                 /* Found a cached EA, update it */
120
121                 if (xattr_val_len != xattr->xe_vallen) {
122                         char *val;
123                         OBD_ALLOC(val, xattr_val_len);
124                         if (val == NULL) {
125                                 CDEBUG(D_CACHE,
126                                        "failed to allocate %u bytes for xattr %s update\n",
127                                        xattr_val_len, xattr_name);
128                                 return -ENOMEM;
129                         }
130                         OBD_FREE(xattr->xe_value, xattr->xe_vallen);
131                         xattr->xe_value = val;
132                         xattr->xe_vallen = xattr_val_len;
133                 }
134                 memcpy(xattr->xe_value, xattr_val, xattr_val_len);
135
136                 CDEBUG(D_CACHE, "update: [%s]=%.*s\n", xattr_name,
137                         xattr_val_len, xattr_val);
138
139                 return 0;
140         }
141
142         OBD_SLAB_ALLOC_PTR_GFP(xattr, xattr_kmem, __GFP_IO);
143         if (xattr == NULL) {
144                 CDEBUG(D_CACHE, "failed to allocate xattr\n");
145                 return -ENOMEM;
146         }
147
148         xattr->xe_namelen = strlen(xattr_name) + 1;
149
150         OBD_ALLOC(xattr->xe_name, xattr->xe_namelen);
151         if (!xattr->xe_name) {
152                 CDEBUG(D_CACHE, "failed to alloc xattr name %u\n",
153                        xattr->xe_namelen);
154                 goto err_name;
155         }
156         OBD_ALLOC(xattr->xe_value, xattr_val_len);
157         if (!xattr->xe_value) {
158                 CDEBUG(D_CACHE, "failed to alloc xattr value %d\n",
159                        xattr_val_len);
160                 goto err_value;
161         }
162
163         memcpy(xattr->xe_name, xattr_name, xattr->xe_namelen);
164         memcpy(xattr->xe_value, xattr_val, xattr_val_len);
165         xattr->xe_vallen = xattr_val_len;
166         list_add(&xattr->xe_list, cache);
167
168         CDEBUG(D_CACHE, "set: [%s]=%.*s\n", xattr_name,
169                 xattr_val_len, xattr_val);
170
171         return 0;
172 err_value:
173         OBD_FREE(xattr->xe_name, xattr->xe_namelen);
174 err_name:
175         OBD_SLAB_FREE_PTR(xattr, xattr_kmem);
176
177         return -ENOMEM;
178 }
179
180 /**
181  * This removes an extended attribute from cache.
182  *
183  * Remove @xattr_name attribute from @cache.
184  *
185  * \retval 0        success
186  * \retval -ENODATA if @xattr_name is not cached
187  */
188 static int ll_xattr_cache_del(struct list_head *cache,
189                               const char *xattr_name)
190 {
191         struct ll_xattr_entry *xattr;
192
193
194
195         CDEBUG(D_CACHE, "del xattr: %s\n", xattr_name);
196
197         if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) {
198                 list_del(&xattr->xe_list);
199                 OBD_FREE(xattr->xe_name, xattr->xe_namelen);
200                 OBD_FREE(xattr->xe_value, xattr->xe_vallen);
201                 OBD_SLAB_FREE_PTR(xattr, xattr_kmem);
202
203                 return 0;
204         }
205
206         return -ENODATA;
207 }
208
209 /**
210  * This iterates cached extended attributes.
211  *
212  * Walk over cached attributes in @cache and
213  * fill in @xld_buffer or only calculate buffer
214  * size if @xld_buffer is NULL.
215  *
216  * \retval >= 0     buffer list size
217  * \retval -ENODATA if the list cannot fit @xld_size buffer
218  */
219 static int ll_xattr_cache_list(struct list_head *cache,
220                                char *xld_buffer,
221                                int xld_size)
222 {
223         struct ll_xattr_entry *xattr, *tmp;
224         int xld_tail = 0;
225
226
227
228         list_for_each_entry_safe(xattr, tmp, cache, xe_list) {
229                 CDEBUG(D_CACHE, "list: buffer=%p[%d] name=%s\n",
230                         xld_buffer, xld_tail, xattr->xe_name);
231
232                 if (xld_buffer) {
233                         xld_size -= xattr->xe_namelen;
234                         if (xld_size < 0)
235                                 break;
236                         memcpy(&xld_buffer[xld_tail],
237                                xattr->xe_name, xattr->xe_namelen);
238                 }
239                 xld_tail += xattr->xe_namelen;
240         }
241
242         if (xld_size < 0)
243                 return -ERANGE;
244
245         return xld_tail;
246 }
247
248 /**
249  * Check if the xattr cache is initialized (filled).
250  *
251  * \retval 0 @cache is not initialized
252  * \retval 1 @cache is initialized
253  */
254 int ll_xattr_cache_valid(struct ll_inode_info *lli)
255 {
256         return !!(lli->lli_flags & LLIF_XATTR_CACHE);
257 }
258
259 /**
260  * This finalizes the xattr cache.
261  *
262  * Free all xattr memory. @lli is the inode info pointer.
263  *
264  * \retval 0 no error occured
265  */
266 static int ll_xattr_cache_destroy_locked(struct ll_inode_info *lli)
267 {
268
269
270         if (!ll_xattr_cache_valid(lli))
271                 return 0;
272
273         while (ll_xattr_cache_del(&lli->lli_xattrs, NULL) == 0)
274                 ; /* empty loop */
275         lli->lli_flags &= ~LLIF_XATTR_CACHE;
276
277         return 0;
278 }
279
280 int ll_xattr_cache_destroy(struct inode *inode)
281 {
282         struct ll_inode_info *lli = ll_i2info(inode);
283         int rc;
284
285
286
287         down_write(&lli->lli_xattrs_list_rwsem);
288         rc = ll_xattr_cache_destroy_locked(lli);
289         up_write(&lli->lli_xattrs_list_rwsem);
290
291         return rc;
292 }
293
294 /**
295  * Match or enqueue a PR or PW LDLM lock.
296  *
297  * Find or request an LDLM lock with xattr data.
298  * Since LDLM does not provide API for atomic match_or_enqueue,
299  * the function handles it with a separate enq lock.
300  * If successful, the function exits with the list lock held.
301  *
302  * \retval 0       no error occured
303  * \retval -ENOMEM not enough memory
304  */
305 static int ll_xattr_find_get_lock(struct inode *inode,
306                                   struct lookup_intent *oit,
307                                   struct ptlrpc_request **req)
308 {
309         ldlm_mode_t mode;
310         struct lustre_handle lockh = { 0 };
311         struct md_op_data *op_data;
312         struct ll_inode_info *lli = ll_i2info(inode);
313         struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
314                                            .ei_mode = it_to_lock_mode(oit),
315                                            .ei_cb_bl = ll_md_blocking_ast,
316                                            .ei_cb_cp = ldlm_completion_ast };
317         struct ll_sb_info *sbi = ll_i2sbi(inode);
318         struct obd_export *exp = sbi->ll_md_exp;
319         int rc;
320
321
322
323         mutex_lock(&lli->lli_xattrs_enq_lock);
324         /* Try matching first. */
325         mode = ll_take_md_lock(inode, MDS_INODELOCK_XATTR, &lockh, 0,
326                                oit->it_op == IT_SETXATTR ? LCK_PW :
327                                                            (LCK_PR | LCK_PW));
328         if (mode != 0) {
329                 /* fake oit in mdc_revalidate_lock() manner */
330                 oit->d.lustre.it_lock_handle = lockh.cookie;
331                 oit->d.lustre.it_lock_mode = mode;
332                 goto out;
333         }
334
335         /* Enqueue if the lock isn't cached locally. */
336         op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
337                                      LUSTRE_OPC_ANY, NULL);
338         if (IS_ERR(op_data)) {
339                 mutex_unlock(&lli->lli_xattrs_enq_lock);
340                 return PTR_ERR(op_data);
341         }
342
343         op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS |
344                             OBD_MD_FLXATTRLOCKED;
345 #ifdef CONFIG_FS_POSIX_ACL
346         /* If working with ACLs, we would like to cache local ACLs */
347         if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
348                 op_data->op_valid |= OBD_MD_FLRMTLGETFACL;
349 #endif
350
351         rc = md_enqueue(exp, &einfo, oit, op_data, &lockh, NULL, 0, NULL, 0);
352         ll_finish_md_op_data(op_data);
353
354         if (rc < 0) {
355                 CDEBUG(D_CACHE,
356                        "md_intent_lock failed with %d for fid "DFID"\n",
357                        rc, PFID(ll_inode2fid(inode)));
358                 mutex_unlock(&lli->lli_xattrs_enq_lock);
359                 return rc;
360         }
361
362         *req = (struct ptlrpc_request *)oit->d.lustre.it_data;
363 out:
364         down_write(&lli->lli_xattrs_list_rwsem);
365         mutex_unlock(&lli->lli_xattrs_enq_lock);
366
367         return 0;
368 }
369
370 /**
371  * Refill the xattr cache.
372  *
373  * Fetch and cache the whole of xattrs for @inode, acquiring
374  * a read or a write xattr lock depending on operation in @oit.
375  * Intent is dropped on exit unless the operation is setxattr.
376  *
377  * \retval 0       no error occured
378  * \retval -EPROTO network protocol error
379  * \retval -ENOMEM not enough memory for the cache
380  */
381 static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
382 {
383         struct ll_sb_info *sbi = ll_i2sbi(inode);
384         struct ptlrpc_request *req = NULL;
385         const char *xdata, *xval, *xtail, *xvtail;
386         struct ll_inode_info *lli = ll_i2info(inode);
387         struct mdt_body *body;
388         __u32 *xsizes;
389         int rc = 0, i;
390
391
392
393         rc = ll_xattr_find_get_lock(inode, oit, &req);
394         if (rc)
395                 GOTO(out_no_unlock, rc);
396
397         /* Do we have the data at this point? */
398         if (ll_xattr_cache_valid(lli)) {
399                 ll_stats_ops_tally(sbi, LPROC_LL_GETXATTR_HITS, 1);
400                 GOTO(out_maybe_drop, rc = 0);
401         }
402
403         /* Matched but no cache? Cancelled on error by a parallel refill. */
404         if (unlikely(req == NULL)) {
405                 CDEBUG(D_CACHE, "cancelled by a parallel getxattr\n");
406                 GOTO(out_maybe_drop, rc = -EIO);
407         }
408
409         if (oit->d.lustre.it_status < 0) {
410                 CDEBUG(D_CACHE, "getxattr intent returned %d for fid "DFID"\n",
411                        oit->d.lustre.it_status, PFID(ll_inode2fid(inode)));
412                 GOTO(out_destroy, rc = oit->d.lustre.it_status);
413         }
414
415         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
416         if (body == NULL) {
417                 CERROR("no MDT BODY in the refill xattr reply\n");
418                 GOTO(out_destroy, rc = -EPROTO);
419         }
420         /* do not need swab xattr data */
421         xdata = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA,
422                                                 body->eadatasize);
423         xval = req_capsule_server_sized_get(&req->rq_pill, &RMF_EAVALS,
424                                                 body->aclsize);
425         xsizes = req_capsule_server_sized_get(&req->rq_pill, &RMF_EAVALS_LENS,
426                                               body->max_mdsize * sizeof(__u32));
427         if (xdata == NULL || xval == NULL || xsizes == NULL) {
428                 CERROR("wrong setxattr reply\n");
429                 GOTO(out_destroy, rc = -EPROTO);
430         }
431
432         xtail = xdata + body->eadatasize;
433         xvtail = xval + body->aclsize;
434
435         CDEBUG(D_CACHE, "caching: xdata=%p xtail=%p\n", xdata, xtail);
436
437         ll_xattr_cache_init(lli);
438
439         for (i = 0; i < body->max_mdsize; i++) {
440                 CDEBUG(D_CACHE, "caching [%s]=%.*s\n", xdata, *xsizes, xval);
441                 /* Perform consistency checks: attr names and vals in pill */
442                 if (memchr(xdata, 0, xtail - xdata) == NULL) {
443                         CERROR("xattr protocol violation (names are broken)\n");
444                         rc = -EPROTO;
445                 } else if (xval + *xsizes > xvtail) {
446                         CERROR("xattr protocol violation (vals are broken)\n");
447                         rc = -EPROTO;
448                 } else if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_XATTR_ENOMEM)) {
449                         rc = -ENOMEM;
450                 } else {
451                         rc = ll_xattr_cache_add(&lli->lli_xattrs, xdata, xval,
452                                                 *xsizes);
453                 }
454                 if (rc < 0) {
455                         ll_xattr_cache_destroy_locked(lli);
456                         GOTO(out_destroy, rc);
457                 }
458                 xdata += strlen(xdata) + 1;
459                 xval  += *xsizes;
460                 xsizes++;
461         }
462
463         if (xdata != xtail || xval != xvtail)
464                 CERROR("a hole in xattr data\n");
465
466         ll_set_lock_data(sbi->ll_md_exp, inode, oit, NULL);
467
468         GOTO(out_maybe_drop, rc);
469 out_maybe_drop:
470         /* drop lock on error or getxattr */
471         if (rc != 0 || oit->it_op != IT_SETXATTR)
472                 ll_intent_drop_lock(oit);
473
474         if (rc != 0)
475                 up_write(&lli->lli_xattrs_list_rwsem);
476 out_no_unlock:
477         ptlrpc_req_finished(req);
478
479         return rc;
480
481 out_destroy:
482         up_write(&lli->lli_xattrs_list_rwsem);
483
484         ldlm_lock_decref_and_cancel((struct lustre_handle *)
485                                         &oit->d.lustre.it_lock_handle,
486                                         oit->d.lustre.it_lock_mode);
487
488         goto out_no_unlock;
489 }
490
491 /**
492  * Get an xattr value or list xattrs using the write-through cache.
493  *
494  * Get the xattr value (@valid has OBD_MD_FLXATTR set) of @name or
495  * list xattr names (@valid has OBD_MD_FLXATTRLS set) for @inode.
496  * The resulting value/list is stored in @buffer if the former
497  * is not larger than @size.
498  *
499  * \retval 0        no error occured
500  * \retval -EPROTO  network protocol error
501  * \retval -ENOMEM  not enough memory for the cache
502  * \retval -ERANGE  the buffer is not large enough
503  * \retval -ENODATA no such attr or the list is empty
504  */
505 int ll_xattr_cache_get(struct inode *inode,
506                         const char *name,
507                         char *buffer,
508                         size_t size,
509                         __u64 valid)
510 {
511         struct lookup_intent oit = { .it_op = IT_GETXATTR };
512         struct ll_inode_info *lli = ll_i2info(inode);
513         int rc = 0;
514
515
516
517         LASSERT(!!(valid & OBD_MD_FLXATTR) ^ !!(valid & OBD_MD_FLXATTRLS));
518
519         down_read(&lli->lli_xattrs_list_rwsem);
520         if (!ll_xattr_cache_valid(lli)) {
521                 up_read(&lli->lli_xattrs_list_rwsem);
522                 rc = ll_xattr_cache_refill(inode, &oit);
523                 if (rc)
524                         return rc;
525                 downgrade_write(&lli->lli_xattrs_list_rwsem);
526         } else {
527                 ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_GETXATTR_HITS, 1);
528         }
529
530         if (valid & OBD_MD_FLXATTR) {
531                 struct ll_xattr_entry *xattr;
532
533                 rc = ll_xattr_cache_find(&lli->lli_xattrs, name, &xattr);
534                 if (rc == 0) {
535                         rc = xattr->xe_vallen;
536                         /* zero size means we are only requested size in rc */
537                         if (size != 0) {
538                                 if (size >= xattr->xe_vallen)
539                                         memcpy(buffer, xattr->xe_value,
540                                                 xattr->xe_vallen);
541                                 else
542                                         rc = -ERANGE;
543                         }
544                 }
545         } else if (valid & OBD_MD_FLXATTRLS) {
546                 rc = ll_xattr_cache_list(&lli->lli_xattrs,
547                                          size ? buffer : NULL, size);
548         }
549
550         GOTO(out, rc);
551 out:
552         up_read(&lli->lli_xattrs_list_rwsem);
553
554         return rc;
555 }
556
557
558 /**
559  * Set/update an xattr value or remove xattr using the write-through cache.
560  *
561  * Set/update the xattr value (if @valid has OBD_MD_FLXATTR) of @name to @newval
562  * or
563  * remove the xattr @name (@valid has OBD_MD_FLXATTRRM set) from @inode.
564  * @flags is either XATTR_CREATE or XATTR_REPLACE as defined by setxattr(2)
565  *
566  * \retval 0        no error occured
567  * \retval -EPROTO  network protocol error
568  * \retval -ENOMEM  not enough memory for the cache
569  * \retval -ERANGE  the buffer is not large enough
570  * \retval -ENODATA no such attr (in the removal case)
571  */
572 int ll_xattr_cache_update(struct inode *inode,
573                         const char *name,
574                         const char *newval,
575                         size_t size,
576                         __u64 valid,
577                         int flags)
578 {
579         struct lookup_intent oit = { .it_op = IT_SETXATTR };
580         struct ll_sb_info *sbi = ll_i2sbi(inode);
581         struct ptlrpc_request *req = NULL;
582         struct ll_inode_info *lli = ll_i2info(inode);
583         struct obd_capa *oc;
584         int rc;
585
586
587
588         LASSERT(!!(valid & OBD_MD_FLXATTR) ^ !!(valid & OBD_MD_FLXATTRRM));
589
590         rc = ll_xattr_cache_refill(inode, &oit);
591         if (rc)
592                 return rc;
593
594         oc = ll_mdscapa_get(inode);
595         rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
596                         valid | OBD_MD_FLXATTRLOCKED, name, newval,
597                         size, 0, flags, ll_i2suppgid(inode), &req);
598         capa_put(oc);
599
600         if (rc) {
601                 ll_intent_drop_lock(&oit);
602                 GOTO(out, rc);
603         }
604
605         if (valid & OBD_MD_FLXATTR)
606                 rc = ll_xattr_cache_add(&lli->lli_xattrs, name, newval, size);
607         else if (valid & OBD_MD_FLXATTRRM)
608                 rc = ll_xattr_cache_del(&lli->lli_xattrs, name);
609
610         ll_intent_drop_lock(&oit);
611         GOTO(out, rc);
612 out:
613         up_write(&lli->lli_xattrs_list_rwsem);
614         ptlrpc_req_finished(req);
615
616         return rc;
617 }