source: wiki-toolkit/trunk/lib/Wiki/Toolkit/Store/Database.pm @ 424

Revision 424, 64.9 KB checked in by dom, 6 years ago (diff)

whitespace-only change to fix some POD bugs and generally make things read
more nicely. Tab-damage fixing still todo...

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1package Wiki::Toolkit::Store::Database;
2
3use strict;
4
5use vars qw( $VERSION $timestamp_fmt );
6$timestamp_fmt = "%Y-%m-%d %H:%M:%S";
7
8use DBI;
9use Time::Piece;
10use Time::Seconds;
11use Carp qw( carp croak );
12use Digest::MD5 qw( md5_hex );
13
14$VERSION = '0.29';
15my $SCHEMA_VER = 9;
16
17# first, detect if Encode is available - it's not under 5.6. If we _are_
18# under 5.6, give up - we'll just have to hope that nothing explodes. This
19# is the current 0.54 behaviour, so that's ok.
20
21my $CAN_USE_ENCODE;
22BEGIN {
23  eval " use Encode ";
24  $CAN_USE_ENCODE = $@ ? 0 : 1;
25}
26
27=head1 NAME
28
29Wiki::Toolkit::Store::Database - parent class for database storage backends
30for Wiki::Toolkit
31
32=head1 SYNOPSIS
33
34This is probably only useful for Wiki::Toolkit developers.
35
36  # See below for parameter details.
37  my $store = Wiki::Toolkit::Store::MySQL->new( %config );
38
39=head1 METHODS
40
41=over 4
42
43=item B<new>
44
45  my $store = Wiki::Toolkit::Store::MySQL->new( dbname  => "wiki",
46                        dbuser  => "wiki",
47                        dbpass  => "wiki",
48                        dbhost  => "db.example.com",
49                        dbport  => 1234,
50                        charset => "iso-8859-1" );
51or
52
53  my $store = Wiki::Toolkit::Store::MySQL->new( dbh => $dbh );
54
55C<charset> is optional, defaults to C<iso-8859-1>, and does nothing
56unless you're using perl 5.8 or newer.
57
58If you do not provide an active database handle in C<dbh>, then
59C<dbname> is mandatory. C<dbpass>, C<dbuser>, C<dbhost> and C<dbport>
60are optional, but you'll want to supply them unless your database's
61connection method doesn't require them.
62
63If you do provide C<database> then it must have the following
64parameters set; otherwise you should just provide the connection
65information and let us create our own handle:
66
67=over 4
68
69=item *
70
71C<RaiseError> = 1
72
73=item *
74
75C<PrintError> = 0
76
77=item *
78
79C<AutoCommit> = 1
80
81=back
82
83=cut
84
85sub new {
86    my ($class, @args) = @_;
87    my $self = {};
88    bless $self, $class;
89    return $self->_init(@args);
90}
91
92sub _init {
93    my ($self, %args) = @_;
94
95    if ( $args{dbh} ) {
96        $self->{_dbh} = $args{dbh};
97        $self->{_external_dbh} = 1; # don't disconnect at DESTROY time
98    } else {
99        die "Must supply a dbname" unless defined $args{dbname};
100        $self->{_dbname} = $args{dbname};
101        $self->{_dbuser} = $args{dbuser} || "";
102        $self->{_dbpass} = $args{dbpass} || "";
103        $self->{_dbhost} = $args{dbhost} || "";
104        $self->{_dbport} = $args{dbport} || "";
105        $self->{_charset} = $args{charset} || "iso-8859-1";
106
107        # Connect to database and store the database handle.
108        my ($dbname, $dbuser, $dbpass, $dbhost, $dbport) =
109                               @$self{qw(_dbname _dbuser _dbpass _dbhost _dbport)};
110        my $dsn = $self->_dsn($dbname, $dbhost, $dbport)
111            or croak "No data source string provided by class";
112        $self->{_dbh} = DBI->connect( $dsn, $dbuser, $dbpass,
113                                      { PrintError => 0, RaiseError => 1,
114                                        AutoCommit => 1 } )
115          or croak "Can't connect to database $dbname using $dsn: "
116                   . DBI->errstr;
117    }
118
119    my ($cur_ver, $db_ver) = $self->schema_current;
120    if ($db_ver < $cur_ver) {
121        croak "Database schema version $db_ver is too old (need $cur_ver)";
122    } elsif ($db_ver > $cur_ver) {
123        croak "Database schema version $db_ver is too new (need $cur_ver)";
124    }
125
126    return $self;
127}
128
129# Internal method, used to handle the logic of how to add up return
130#  values from pre_ plugins
131sub handle_pre_plugin_ret {
132        my ($running_total_ref,$result) = @_;
133
134        if(($result && $result == 0) || !$result) {
135                # No opinion, no need to change things
136        } elsif($result == -1 || $result == 1) {
137                # Increase or decrease as requested
138                $$running_total_ref += $result;
139        } else {
140                # Invalid return code
141                warn("Pre_ plugin returned invalid accept/deny value of '$result'");
142        }
143}
144
145=item B<retrieve_node>
146
147  my $content = $store->retrieve_node($node);
148
149  # Or get additional meta-data too.
150  my %node = $store->retrieve_node("HomePage");
151  print "Current Version: " . $node{version};
152
153  # Maybe we stored some metadata too.
154  my $categories = $node{metadata}{category};
155  print "Categories: " . join(", ", @$categories);
156  print "Postcode: $node{metadata}{postcode}[0]";
157
158  # Or get an earlier version:
159  my %node = $store->retrieve_node(name    => "HomePage",
160                                     version => 2 );
161  print $node{content};
162
163
164In scalar context, returns the current (raw Wiki language) contents of
165the specified node. In list context, returns a hash containing the
166contents of the node plus additional data:
167
168=over 4
169
170=item B<last_modified>
171
172=item B<version>
173
174=item B<checksum>
175
176=item B<metadata> - a reference to a hash containing any caller-supplied
177metadata sent along the last time the node was written
178
179The node parameter is mandatory. The version parameter is optional and
180defaults to the newest version. If the node hasn't been created yet,
181it is considered to exist but be empty (this behaviour might change).
182
183B<Note> on metadata - each hash value is returned as an array ref,
184even if that type of metadata only has one value.
185
186=cut
187
188sub retrieve_node {
189    my $self = shift;
190    my %args = scalar @_ == 1 ? ( name => $_[0] ) : @_;
191        unless($args{'version'}) { $args{'version'} = undef; }
192
193    # Call pre_retrieve on any plugins, in case they want to tweak anything
194    my @plugins = @{ $args{plugins} || [ ] };
195    foreach my $plugin (@plugins) {
196        if ( $plugin->can( "pre_retrieve" ) ) {
197            $plugin->pre_retrieve( 
198                                node     => \$args{'name'},
199                                version  => \$args{'version'}
200                        );
201        }
202    }
203
204    # Note _retrieve_node_data is sensitive to calling context.
205        unless(wantarray) {
206                # Scalar context, will return just the content
207                return $self->_retrieve_node_data( %args );
208        }
209
210    my %data = $self->_retrieve_node_data( %args );
211        $data{'checksum'} = $self->_checksum(%data);
212    return %data;
213}
214
215# Returns hash or scalar depending on calling context.
216sub _retrieve_node_data {
217    my ($self, %args) = @_;
218    my %data = $self->_retrieve_node_content( %args );
219        unless(wantarray) {
220                # Scalar context, return just the content
221                return $data{content};
222        }
223
224    # If we want additional data then get it.  Note that $data{version}
225    # will already have been set by C<_retrieve_node_content>, if it wasn't
226    # specified in the call.
227    my $dbh = $self->dbh;
228    my $sql = "SELECT metadata_type, metadata_value "
229         . "FROM node "
230         . "INNER JOIN metadata ON (node_id = id) "
231         . "WHERE name=? "
232         . "AND metadata.version=?";
233    my $sth = $dbh->prepare($sql);
234    $sth->execute($args{name},$data{version}) or croak $dbh->errstr;
235
236    my %metadata;
237    while ( my ($type, $val) = $self->charset_decode( $sth->fetchrow_array ) ) {
238        if ( defined $metadata{$type} ) {
239                push @{$metadata{$type}}, $val;
240            } else {
241            $metadata{$type} = [ $val ];
242        }
243    }
244    $data{metadata} = \%metadata;
245    return %data;
246}
247
248# $store->_retrieve_node_content( name    => $node_name,
249#                                 version => $node_version );
250# Params: 'name' is compulsory, 'version' is optional and defaults to latest.
251# Returns a hash of data for C<retrieve_node> - content, version, last modified
252sub _retrieve_node_content {
253    my ($self, %args) = @_;
254    croak "No valid node name supplied" unless $args{name};
255    my $dbh = $self->dbh;
256    my $sql;
257
258        my $version_sql_val;
259        my $text_source;
260    if ( $args{version} ) {
261                # Version given - get that version, and the content for that version
262                $version_sql_val = $dbh->quote($self->charset_encode($args{version}));
263                $text_source = "content";
264        } else {
265                # No version given, grab latest version (and content for that)
266                $version_sql_val = "node.version";
267                $text_source = "node";
268        }
269    $sql = "SELECT "
270         . "     $text_source.text, content.version, "
271         . "     content.modified, content.moderated, "
272         . "     node.moderate "
273         . "FROM node "
274         . "INNER JOIN content ON (id = node_id) "
275         . "WHERE name=" . $dbh->quote($self->charset_encode($args{name}))
276         . " AND content.version=" . $version_sql_val;
277    my @results = $self->charset_decode( $dbh->selectrow_array($sql) );
278    @results = ("", 0, "") unless scalar @results;
279    my %data;
280    @data{ qw( content version last_modified moderated node_requires_moderation ) } = @results;
281    return %data;
282}
283
284# Expects a hash as returned by ->retrieve_node
285sub _checksum {
286    my ($self, %node_data) = @_;
287    my $string = $node_data{content};
288    my %metadata = %{ $node_data{metadata} || {} };
289    foreach my $key ( sort keys %metadata ) {
290        $string .= "\0\0\0" . $key . "\0\0"
291                 . join("\0", sort @{$metadata{$key}} );
292    }
293    return md5_hex($self->charset_encode($string));
294}
295
296# Expects an array of hashes whose keys and values are scalars.
297sub _checksum_hashes {
298    my ($self, @hashes) = @_;
299    my @strings = "";
300    foreach my $hashref ( @hashes ) {
301        my %hash = %$hashref;
302        my $substring = "";
303        foreach my $key ( sort keys %hash ) {
304            $substring .= "\0\0" . $key . "\0" . $hash{$key};
305        }
306        push @strings, $substring;
307    }
308    my $string = join("\0\0\0", sort @strings);
309    return md5_hex($string);
310}
311
312=item B<node_exists>
313
314  my $ok = $store->node_exists( "Wombat Defenestration" );
315
316  # or ignore case - optional but recommended
317  my $ok = $store->node_exists(
318                                name        => "monkey brains",
319                                ignore_case => 1,
320                              ); 
321
322Returns true if the node has ever been created (even if it is
323currently empty), and false otherwise.
324
325By default, the case-sensitivity of C<node_exists> depends on your
326database.  If you supply a true value to the C<ignore_case> parameter,
327then you can be sure of its being case-insensitive.  This is
328recommended.
329
330=cut
331
332sub node_exists {
333    my $self = shift;
334    if ( scalar @_ == 1 ) {
335        my $node = shift;
336        return $self->_do_old_node_exists( $node );
337    } else {
338        my %args = @_;
339        return $self->_do_old_node_exists( $args{name} )
340          unless $args{ignore_case};
341        my $sql = $self->_get_node_exists_ignore_case_sql;
342        my $sth = $self->dbh->prepare( $sql );
343        $sth->execute( $args{name} );
344        my $found_name = $sth->fetchrow_array || "";
345        $sth->finish;
346        return lc($found_name) eq lc($args{name}) ? 1 : 0;
347    }
348}
349
350sub _do_old_node_exists {
351    my ($self, $node) = @_;
352    my %data = $self->retrieve_node($node) or return ();
353    return $data{version}; # will be 0 if node doesn't exist, >=1 otherwise
354}
355
356=item B<verify_checksum>
357
358  my $ok = $store->verify_checksum($node, $checksum);
359
360Sees whether your checksum is current for the given node. Returns true
361if so, false if not.
362
363B<NOTE:> Be aware that when called directly and without locking, this
364might not be accurate, since there is a small window between the
365checking and the returning where the node might be changed, so
366B<don't> rely on it for safe commits; use C<write_node> for that. It
367can however be useful when previewing edits, for example.
368
369=cut
370
371sub verify_checksum {
372    my ($self, $node, $checksum) = @_;
373#warn $self;
374    my %node_data = $self->_retrieve_node_data( name => $node );
375    return ( $checksum eq $self->_checksum( %node_data ) );
376}
377
378=item B<list_backlinks>
379
380  # List all nodes that link to the Home Page.
381  my @links = $store->list_backlinks( node => "Home Page" );
382
383=cut
384
385sub list_backlinks {
386    my ( $self, %args ) = @_;
387    my $node = $args{node};
388    croak "Must supply a node name" unless $node;
389    my $dbh = $self->dbh;
390    my $sql = "SELECT link_from FROM internal_links WHERE link_to="
391            . $dbh->quote($node);
392    my $sth = $dbh->prepare($sql);
393    $sth->execute or croak $dbh->errstr;
394    my @backlinks;
395    while ( my ($backlink) = $self->charset_decode( $sth->fetchrow_array ) ) {
396        push @backlinks, $backlink;
397    }
398    return @backlinks;
399}
400
401=item B<list_dangling_links>
402
403  # List all nodes that have been linked to from other nodes but don't
404  # yet exist.
405  my @links = $store->list_dangling_links;
406
407Each node is returned once only, regardless of how many other nodes
408link to it.
409
410=cut
411
412sub list_dangling_links {
413    my $self = shift;
414    my $dbh = $self->dbh;
415    my $sql = "SELECT DISTINCT internal_links.link_to
416               FROM internal_links LEFT JOIN node
417                                   ON node.name=internal_links.link_to
418               WHERE node.version IS NULL";
419    my $sth = $dbh->prepare($sql);
420    $sth->execute or croak $dbh->errstr;
421    my @links;
422    while ( my ($link) = $self->charset_decode( $sth->fetchrow_array ) ) {
423        push @links, $link;
424    }
425    return @links;
426}
427
428=item B<write_node_post_locking>
429
430  $store->write_node_post_locking( node     => $node,
431                                   content  => $content,
432                                   links_to => \@links_to,
433                                   metadata => \%metadata,
434                                   requires_moderation => $requires_moderation,
435                                   plugins  => \@plugins   )
436      or handle_error();
437
438Writes the specified content into the specified node, then calls
439C<post_write> on all supplied plugins, with arguments C<node>,
440C<version>, C<content>, C<metadata>.
441
442Making sure that locking/unlocking/transactions happen is left up to
443you (or your chosen subclass). This method shouldn't really be used
444directly as it might overwrite someone else's changes. Croaks on error
445but otherwise returns true.
446
447Supplying a ref to an array of nodes that this ones links to is
448optional, but if you do supply it then this node will be returned when
449calling C<list_backlinks> on the nodes in C<@links_to>. B<Note> that
450if you don't supply the ref then the store will assume that this node
451doesn't link to any others, and update itself accordingly.
452
453The metadata hashref is also optional, as is requires_moderation.
454
455B<Note> on the metadata hashref: Any data in here that you wish to
456access directly later must be a key-value pair in which the value is
457either a scalar or a reference to an array of scalars.  For example:
458
459  $wiki->write_node( "Calthorpe Arms", "nice pub", $checksum,
460                     { category => [ "Pubs", "Bloomsbury" ],
461                       postcode => "WC1X 8JR" } );
462
463  # and later
464
465  my @nodes = $wiki->list_nodes_by_metadata(
466      metadata_type  => "category",
467      metadata_value => "Pubs"             );
468
469For more advanced usage (passing data through to registered plugins)
470you may if you wish pass key-value pairs in which the value is a
471hashref or an array of hashrefs. The data in the hashrefs will not be
472stored as metadata; it will be checksummed and the checksum will be
473stored instead (as C<__metadatatypename__checksum>). Such data can
474I<only> be accessed via plugins.
475
476=cut
477
478sub write_node_post_locking {
479    my ($self, %args) = @_;
480    my ($node, $content, $links_to_ref, $metadata_ref, $requires_moderation) =
481             @args{ qw( node content links_to metadata requires_moderation) };
482    my $dbh = $self->dbh;
483
484    my $timestamp = $self->_get_timestamp();
485    my @links_to = @{ $links_to_ref || [] }; # default to empty array
486    my $version;
487        unless($requires_moderation) { $requires_moderation = 0; }
488
489    # Call pre_write on any plugins, in case they want to tweak anything
490    my @preplugins = @{ $args{plugins} || [ ] };
491        my $write_allowed = 1;
492    foreach my $plugin (@preplugins) {
493        if ( $plugin->can( "pre_write" ) ) {
494                        handle_pre_plugin_ret(
495                                \$write_allowed,
496                                $plugin->pre_write( 
497                                        node     => \$node,
498                                        content  => \$content,
499                                        metadata => \$metadata_ref )
500                        );
501        }
502    }
503        if($write_allowed < 1) {
504                # The plugins didn't want to allow this action
505                return -1;
506        }
507
508    # Either inserting a new page or updating an old one.
509    my $sql = "SELECT count(*) FROM node WHERE name=" . $dbh->quote($node);
510    my $exists = @{ $dbh->selectcol_arrayref($sql) }[0] || 0;
511
512
513        # If it doesn't exist, add it right now
514    if(! $exists) {
515                # Add in a new version
516        $version = 1;
517
518                # Handle initial moderation
519                my $node_content = $content;
520                if($requires_moderation) {
521                        $node_content = "=== This page has yet to be moderated. ===";
522                }
523
524                # Add the node and content
525        my $add_sql = 
526                         "INSERT INTO node "
527                        ."    (name, version, text, modified, moderate) "
528                        ."VALUES (?, ?, ?, ?, ?)";
529                my $add_sth = $dbh->prepare($add_sql);
530                $add_sth->execute(
531                        map{ $self->charset_encode($_) }
532                                ($node, $version, $node_content, $timestamp, $requires_moderation)
533                ) or croak "Error updating database: " . DBI->errstr;
534    }
535
536    # Get the ID of the node we've added / we're about to update
537        # Also get the moderation status for it
538    $sql = "SELECT id, moderate FROM node WHERE name=" . $dbh->quote($node);
539    my ($node_id,$node_requires_moderation) = $dbh->selectrow_array($sql);
540
541        # Only update node if it exists, and moderation isn't enabled on the node
542        # Whatever happens, if it exists, generate a new version number
543    if($exists) {
544                # Get the new version number
545        $sql = "SELECT max(content.version) FROM node
546                INNER JOIN content ON (id = node_id)
547                WHERE name=" . $dbh->quote($node);
548        $version = @{ $dbh->selectcol_arrayref($sql) }[0] || 0;
549        croak "Can't get version number" unless $version;
550        $version++;
551
552                # Update the node only if node doesn't require moderation
553                if(!$node_requires_moderation) {
554                        $sql = "UPDATE node SET version=" . $dbh->quote($version)
555                         . ", text=" . $dbh->quote($self->charset_encode($content))
556                         . ", modified=" . $dbh->quote($timestamp)
557                         . " WHERE name=" . $dbh->quote($self->charset_encode($node));
558                        $dbh->do($sql) or croak "Error updating database: " . DBI->errstr;
559                }
560
561                # You can't use this to enable moderation on an existing node
562        if($requires_moderation) {
563           warn("Moderation not added to existing node '$node', use normal moderation methods instead");
564        }
565        }
566
567
568    # Now node is updated (if required), add to the history
569    my $add_sql = 
570                 "INSERT INTO content "
571                ."      (node_id, version, text, modified, moderated) "
572                ."VALUES (?, ?, ?, ?, ?)";
573        my $add_sth = $dbh->prepare($add_sql);
574        $add_sth->execute(
575                map { $self->charset_encode($_) }
576                        ($node_id, $version, $content, $timestamp, (1-$node_requires_moderation))
577    ) or croak "Error updating database: " . DBI->errstr;
578
579
580    # Update the backlinks.
581    $dbh->do("DELETE FROM internal_links WHERE link_from="
582             . $dbh->quote($self->charset_encode($node)) ) or croak $dbh->errstr;
583    foreach my $links_to ( @links_to ) {
584        $sql = "INSERT INTO internal_links (link_from, link_to) VALUES ("
585             . join(", ", map { $dbh->quote($self->charset_encode($_)) } ( $node, $links_to ) ) . ")";
586        # Better to drop a backlink or two than to lose the whole update.
587        # Shevek wants a case-sensitive wiki, Jerakeen wants a case-insensitive
588        # one, MySQL compares case-sensitively on varchars unless you add
589        # the binary keyword.  Case-sensitivity to be revisited.
590        eval { $dbh->do($sql); };
591        carp "Couldn't index backlink: " . $dbh->errstr if $@;
592    }
593
594    # And also store any metadata.  Note that any entries already in the
595    # metadata table refer to old versions, so we don't need to delete them.
596    my %metadata = %{ $metadata_ref || {} }; # default to no metadata
597    foreach my $type ( keys %metadata ) {
598        my $val = $metadata{$type};
599
600        # We might have one or many values; make an array now to merge cases.
601        my @values = (ref $val and ref $val eq 'ARRAY') ? @$val : ( $val );
602
603        # Find out whether all values for this type are scalars.
604        my $all_scalars = 1;
605        foreach my $value (@values) {
606            $all_scalars = 0 if ref $value;
607        }
608
609        # For adding to metadata
610        my $add_sql = 
611              "INSERT INTO metadata "
612             ."   (node_id, version, metadata_type, metadata_value) "
613             ."VALUES (?, ?, ?, ?)";
614        my $add_sth = $dbh->prepare($add_sql);
615
616        # If all values for this type are scalars, strip out any duplicates
617        # and store the data.
618        if ( $all_scalars ) {
619            my %unique = map { $_ => 1 } @values;
620            @values = keys %unique;
621
622            foreach my $value ( @values ) {
623                                $add_sth->execute(
624                    map { $self->charset_encode($_) }
625                        ( $node_id, $version, $type, $value )
626                    ) or croak $dbh->errstr;
627            }
628            } else {
629            # Otherwise grab a checksum and store that.
630            my $type_to_store  = "__" . $type . "__checksum";
631            my $value_to_store = $self->_checksum_hashes( @values );
632            $add_sth->execute(
633                  map { $self->charset_encode($_) }
634                      ( $node_id, $version, $type_to_store, $value_to_store )
635            )  or croak $dbh->errstr;
636            }
637    }
638
639    # Finally call post_write on any plugins.
640    my @postplugins = @{ $args{plugins} || [ ] };
641    foreach my $plugin (@postplugins) {
642        if ( $plugin->can( "post_write" ) ) {
643            $plugin->post_write( 
644                                node     => $node,
645                                node_id  => $node_id,
646                                version  => $version,
647                                content  => $content,
648                                metadata => $metadata_ref );
649        }
650    }
651
652    return 1;
653}
654
655# Returns the timestamp of now, unless epoch is supplied.
656sub _get_timestamp {
657    my $self = shift;
658    # I don't care about no steenkin' timezones (yet).
659    my $time = shift || localtime; # Overloaded by Time::Piece.
660    unless( ref $time ) {
661        $time = localtime($time); # Make it into an object for strftime
662    }
663    return $time->strftime($timestamp_fmt); # global
664}
665
666=item B<rename_node>
667
668  $store->rename_node(
669                         old_name  => $node,
670                         new_name  => $new_node,
671                         wiki      => $wiki,
672                         create_new_versions => $create_new_versions,
673                       );
674
675Renames a node, updating any references to it as required (assuming your
676chosen formatter supports rename, that is).
677
678Uses the internal_links table to identify the nodes that link to this
679one, and re-writes any wiki links in these to point to the new name.
680
681=cut
682
683sub rename_node {
684    my ($self, %args) = @_;
685    my ($old_name,$new_name,$wiki,$create_new_versions) = 
686                @args{ qw( old_name new_name wiki create_new_versions ) };
687    my $dbh = $self->dbh;
688        my $formatter = $wiki->{_formatter};
689
690    my $timestamp = $self->_get_timestamp();
691
692    # Call pre_rename on any plugins, in case they want to tweak anything
693    my @preplugins = @{ $args{plugins} || [ ] };
694        my $rename_allowed = 1;
695    foreach my $plugin (@preplugins) {
696        if ( $plugin->can( "pre_rename" ) ) {
697                        handle_pre_plugin_ret(
698                                \$rename_allowed,
699                                $plugin->pre_rename( 
700                                        old_name => \$old_name,
701                                        new_name => \$new_name,
702                                        create_new_versions => \$create_new_versions,
703                                )
704                        );
705        }
706    }
707        if($rename_allowed < 1) {
708                # The plugins didn't want to allow this action
709                return -1;
710        }
711
712        # Get the ID of the node
713        my $sql = "SELECT id FROM node WHERE name=?";
714        my $sth = $dbh->prepare($sql);
715        $sth->execute($old_name);
716        my ($node_id) = $sth->fetchrow_array;
717    $sth->finish;
718
719
720        # If the formatter supports it, get a list of the internal
721        #  links to the page, which will have their links re-written
722        # (Do now before we update the name of the node, in case of
723        #  self links)
724        my @links;
725        if($formatter->can("rename_links")) {
726                # Get a list of the pages that link to the page
727                $sql = "SELECT id, name, version "
728                        ."FROM internal_links "
729                        ."INNER JOIN node "
730                        ."      ON (link_from = name) "
731                        ."WHERE link_to = ?";
732                $sth = $dbh->prepare($sql);
733                $sth->execute($old_name);
734
735                # Grab them all, then update, so no locking problems
736                while(my @l = $sth->fetchrow_array) { push (@links, \@l); }
737        }
738
739       
740        # Rename the node
741        $sql = "UPDATE node SET name=? WHERE id=?";
742        $sth = $dbh->prepare($sql);
743        $sth->execute($new_name,$node_id);
744
745
746        # Fix the internal links from this page
747        # (Otherwise write_node will get confused if we rename links later on)
748        $sql = "UPDATE internal_links SET link_from=? WHERE link_from=?";
749        $sth = $dbh->prepare($sql);
750        $sth->execute($new_name,$old_name);
751
752
753        # Update the text of internal links, if the formatter supports it
754        if($formatter->can("rename_links")) {
755                # Update the linked pages (may include renamed page)
756                foreach my $l (@links) {
757                        my ($page_id, $page_name, $page_version) = @$l;
758                        # Self link special case
759                        if($page_name eq $old_name) { $page_name = $new_name; }
760
761                        # Grab the latest version of that page
762                        my %page = $self->retrieve_node(
763                                        name=>$page_name, version=>$page_version
764                        );
765
766                        # Update the content of the page
767                        my $new_content = 
768                                $formatter->rename_links($old_name,$new_name,$page{'content'});
769
770                        # Did it change?
771                        if($new_content ne $page{'content'}) {
772                                # Write the updated page out
773                                if($create_new_versions) {
774                                        # Write out as a new version of the node
775                                        # (This will also fix our internal links)
776                                        $wiki->write_node(
777                                                                $page_name, 
778                                                                $new_content,
779                                                                $page{checksum},
780                                                                $page{metadata}
781                                        );
782                                } else {
783                                        # Just update the content
784                                        my $update_sql_a = "UPDATE node SET text=? WHERE id=?";
785                                        my $update_sql_b = "UPDATE content SET text=? ".
786                                                                           "WHERE node_id=? AND version=?";
787
788                                        my $u_sth = $dbh->prepare($update_sql_a);
789                                        $u_sth->execute($new_content,$page_id);
790                                        $u_sth = $dbh->prepare($update_sql_b);
791                                        $u_sth->execute($new_content,$page_id,$page_version);
792                                }
793                        }
794                }
795
796                # Fix the internal links if we didn't create new versions of the node
797                if(! $create_new_versions) {
798                        $sql = "UPDATE internal_links SET link_to=? WHERE link_to=?";
799                        $sth = $dbh->prepare($sql);
800                        $sth->execute($new_name,$old_name);
801                }
802        } else {
803                warn("Internal links not updated following node rename - unsupported by formatter");
804        }
805
806    # Call post_rename on any plugins, in case they want to do anything
807    my @postplugins = @{ $args{plugins} || [ ] };
808    foreach my $plugin (@postplugins) {
809        if ( $plugin->can( "post_rename" ) ) {
810            $plugin->post_rename( 
811                                old_name => $old_name,
812                                new_name => $new_name,
813                                node_id => $node_id,
814                        );
815        }
816    }
817}
818
819=item B<moderate_node>
820
821  $store->moderate_node(
822                         name    => $node,
823                         version => $version
824                       );
825
826Marks the given version of the node as moderated. If this is the
827highest moderated version, then update the node's contents to hold
828this version.
829
830=cut
831
832sub moderate_node {
833    my $self = shift;
834    my %args = scalar @_ == 2 ? ( name => $_[0], version => $_[1] ) : @_;
835    my $dbh = $self->dbh;
836
837        my ($name,$version) = ($args{name},$args{version});
838
839    # Call pre_moderate on any plugins.
840    my @plugins = @{ $args{plugins} || [ ] };
841        my $moderation_allowed = 1;
842    foreach my $plugin (@plugins) {
843        if ( $plugin->can( "pre_moderate" ) ) {
844                        handle_pre_plugin_ret(
845                                \$moderation_allowed,
846                                $plugin->pre_moderate( 
847                                        node     => \$name,
848                                        version  => \$version )
849                        );
850        }
851    }
852        if($moderation_allowed < 1) {
853                # The plugins didn't want to allow this action
854                return -1;
855        }
856
857        # Get the ID of this node
858    my $id_sql = "SELECT id FROM node WHERE name=?";
859    my $id_sth = $dbh->prepare($id_sql);
860    $id_sth->execute($name);
861        my ($node_id) = $id_sth->fetchrow_array;
862    $id_sth->finish;
863
864        # Check what the current highest moderated version is
865        my $hv_sql = 
866                 "SELECT max(version) "
867                ."FROM content "
868                ."WHERE node_id = ? "
869                ."AND moderated = ?";
870        my $hv_sth = $dbh->prepare($hv_sql);
871        $hv_sth->execute($node_id, "1") or croak $dbh->errstr;
872        my ($highest_mod_version) = $hv_sth->fetchrow_array;
873    $hv_sth->finish;
874        unless($highest_mod_version) { $highest_mod_version = 0; }
875
876        # Mark this version as moderated
877        my $update_sql = 
878                 "UPDATE content "
879                ."SET moderated = ? "
880                ."WHERE node_id = ? "
881                ."AND version = ?";
882        my $update_sth = $dbh->prepare($update_sql);
883        $update_sth->execute("1", $node_id, $version) or croak $dbh->errstr;
884
885        # Are we now the highest moderated version?
886        if(int($version) > int($highest_mod_version)) {
887                # Newly moderated version is newer than previous moderated version
888                # So, make the current version the latest version
889                my %new_data = $self->retrieve_node( name => $name, version => $version );
890
891                # Make sure last modified is properly null, if not set
892                unless($new_data{last_modified}) { $new_data{last_modified} = undef; }
893
894                my $newv_sql = 
895                         "UPDATE node "
896                        ."SET version=?, text=?, modified=? "
897                        ."WHERE id = ?";
898                my $newv_sth = $dbh->prepare($newv_sql);
899                $newv_sth->execute(
900                        $version, $self->charset_encode($new_data{content}), 
901                        $new_data{last_modified}, $node_id
902                ) or croak $dbh->errstr;
903        } else {
904                # A higher version is already moderated, so don't change node
905        }
906
907        # TODO: Do something about internal links, if required
908
909    # Finally call post_moderate on any plugins.
910    @plugins = @{ $args{plugins} || [ ] };
911    foreach my $plugin (@plugins) {
912        if ( $plugin->can( "post_moderate" ) ) {
913            $plugin->post_moderate( 
914                                node     => $name,
915                                node_id  => $node_id,
916                                version  => $version );
917        }
918    }
919
920        return 1;
921}
922
923=item B<set_node_moderation>
924
925  $store->set_node_moderation(
926                         name     => $node,
927                         required => $required
928                       );
929
930Sets if new node versions will require moderation or not
931
932=cut
933
934sub set_node_moderation {
935    my $self = shift;
936    my %args = scalar @_ == 2 ? ( name => $_[0], required => $_[1] ) : @_;
937    my $dbh = $self->dbh;
938
939        my ($name,$required) = ($args{name},$args{required});
940
941        # Get the ID of this node
942    my $id_sql = "SELECT id FROM node WHERE name=?";
943    my $id_sth = $dbh->prepare($id_sql);
944    $id_sth->execute($name);
945        my ($node_id) = $id_sth->fetchrow_array;
946    $id_sth->finish;
947
948        # Check we really got an ID
949    unless($node_id) {
950        return 0;
951    }
952
953        # Mark it as requiring / not requiring moderation
954        my $mod_sql = 
955                 "UPDATE node "
956                ."SET moderate = ? "
957                ."WHERE id = ? ";
958        my $mod_sth = $dbh->prepare($mod_sql);
959        $mod_sth->execute("$required", $node_id) or croak $dbh->errstr;
960
961        return 1;
962}
963
964=item B<delete_node>
965
966  $store->delete_node(
967                       name    => $node,
968                       version => $version,
969                       wiki    => $wiki
970                     );
971
972C<version> is optional.  If it is supplied then only that version of
973the node will be deleted.  Otherwise the node and all its history will
974be completely deleted.
975
976C<wiki> is also optional, but if you care about updating the backlinks
977you want to include it.
978
979Again, doesn't do any locking. You probably don't want to let anyone
980except Wiki admins call this. You may not want to use it at all.
981
982Croaks on error, silently does nothing if the node or version doesn't
983exist, returns true if no error.
984
985=cut
986
987sub delete_node {
988    my $self = shift;
989    # Backwards compatibility.
990    my %args = ( scalar @_ == 1 ) ? ( name => $_[0] ) : @_;
991
992    my $dbh = $self->dbh;
993    my ($name, $version, $wiki) = @args{ qw( name version wiki ) };
994
995    # Grab the ID of this node
996    # (It will only ever have one entry in node, but might have entries
997    #  for other versions in metadata and content)
998    my $id_sql = "SELECT id FROM node WHERE name=?";
999    my $id_sth = $dbh->prepare($id_sql);
1000    $id_sth->execute($name);
1001        my ($node_id) = $id_sth->fetchrow_array;
1002    $id_sth->finish;
1003
1004    # Trivial case - delete the whole node and all its history.
1005    unless ( $version ) {
1006        my $sql;
1007        # Should start a transaction here.  FIXME.
1008        # Do deletes
1009        $sql = "DELETE FROM content WHERE node_id = $node_id";
1010        $dbh->do($sql) or croak "Deletion failed: " . DBI->errstr;
1011        $sql = "DELETE FROM internal_links WHERE link_from=".$dbh->quote($name);
1012        $dbh->do($sql) or croak $dbh->errstr;
1013        $sql = "DELETE FROM metadata WHERE node_id = $node_id";
1014        $dbh->do($sql) or croak $dbh->errstr;
1015        $sql = "DELETE FROM node WHERE id = $node_id";
1016        $dbh->do($sql) or croak "Deletion failed: " . DBI->errstr;
1017
1018        # And finish it here.
1019                post_delete_node($name,$node_id,$version,$args{plugins});
1020        return 1;
1021    }
1022
1023    # Skip out early if we're trying to delete a nonexistent version.
1024    my %verdata = $self->retrieve_node( name => $name, version => $version );
1025        unless($verdata{version}) {
1026                warn("Asked to delete non existant version $version of node $node_id ($name)");
1027                return 1;
1028        }
1029
1030    # Reduce to trivial case if deleting the only version.
1031    my $sql = "SELECT COUNT(*) FROM content WHERE node_id = $node_id";
1032    my $sth = $dbh->prepare( $sql );
1033    $sth->execute() or croak "Deletion failed: " . $dbh->errstr;
1034    my ($count) = $sth->fetchrow_array;
1035    $sth->finish;
1036        if($count == 1) {
1037                # Only one version, so can do the non version delete
1038            return $self->delete_node( name=>$name, plugins=>$args{plugins} );
1039        }
1040
1041    # Check whether we're deleting the latest (moderated) version.
1042    my %currdata = $self->retrieve_node( name => $name );
1043    if ( $currdata{version} == $version ) {
1044                # Deleting latest version, so need to update the copy in node
1045        # (Can't just grab version ($version - 1) since it may have been
1046        #  deleted itself, or might not be moderated.)
1047        my $try = $version - 1;
1048        my %prevdata;
1049        until ( $prevdata{version} && $prevdata{moderated} ) {
1050            %prevdata = $self->retrieve_node(
1051                                              name    => $name,
1052                                              version => $try,
1053                                            );
1054            $try--;
1055        }
1056
1057        # Move to new (old) version
1058        my $sql="UPDATE node
1059                 SET version=?, text=?, modified=?
1060                 WHERE name=?";
1061        my $sth = $dbh->prepare( $sql );
1062        $sth->execute( @prevdata{ qw( version content last_modified ) }, $name)
1063          or croak "Deletion failed: " . $dbh->errstr;
1064
1065                # Remove the current version from content
1066        $sql = "DELETE FROM content
1067                WHERE node_id = $node_id
1068                AND version = $version";
1069        $sth = $dbh->prepare( $sql );
1070        $sth->execute()
1071          or croak "Deletion failed: " . $dbh->errstr;
1072
1073                # Update the internal links to reflect the new version
1074        $sql = "DELETE FROM internal_links WHERE link_from=?";
1075        $sth = $dbh->prepare( $sql );
1076        $sth->execute( $name )
1077          or croak "Deletion failed: " . $dbh->errstr;
1078        my @links_to;
1079        my $formatter = $wiki->formatter;
1080        if ( $formatter->can( "find_internal_links" ) ) {
1081            # Supply $metadata to formatter in case it's needed to alter the
1082            # behaviour of the formatter, eg for Wiki::Toolkit::Formatter::Multiple
1083            my @all = $formatter->find_internal_links(
1084                                    $prevdata{content}, $prevdata{metadata} );
1085            my %unique = map { $_ => 1 } @all;
1086            @links_to = keys %unique;
1087        }
1088        $sql = "INSERT INTO internal_links (link_from, link_to) VALUES (?,?)";
1089        $sth = $dbh->prepare( $sql );
1090        foreach my $link ( @links_to ) {
1091            eval { $sth->execute( $name, $link ); };
1092            carp "Couldn't index backlink: " . $dbh->errstr if $@;
1093        }
1094
1095                # Delete the metadata for the old version
1096        $sql = "DELETE FROM metadata
1097                WHERE node_id = $node_id
1098                AND version = $version";
1099        $sth = $dbh->prepare( $sql );
1100        $sth->execute()
1101          or croak "Deletion failed: " . $dbh->errstr;
1102
1103                # All done
1104                post_delete_node($name,$node_id,$version,$args{plugins});
1105        return 1;
1106    }
1107
1108    # If we're still here, then we're deleting neither the latest
1109    # nor the only version.
1110    $sql = "DELETE FROM content
1111            WHERE node_id = $node_id
1112            AND version=?";
1113    $sth = $dbh->prepare( $sql );
1114    $sth->execute( $version )
1115      or croak "Deletion failed: " . $dbh->errstr;
1116    $sql = "DELETE FROM metadata
1117            WHERE node_id = $node_id
1118            AND version=?";
1119    $sth = $dbh->prepare( $sql );
1120    $sth->execute( $version )
1121      or croak "Deletion failed: " . $dbh->errstr;
1122
1123        # All done
1124        post_delete_node($name,$node_id,$version,$args{plugins});
1125    return 1;
1126}
1127
1128# Returns the name of the node with the given ID
1129# Not normally used except when doing low-level maintenance
1130sub node_name_for_id {
1131        my ($self, $node_id) = @_;
1132    my $dbh = $self->dbh;
1133
1134    my $name_sql = "SELECT name FROM node WHERE id=?";
1135    my $name_sth = $dbh->prepare($name_sql);
1136    $name_sth->execute($node_id);
1137        my ($name) = $name_sth->fetchrow_array;
1138    $name_sth->finish;
1139
1140        return $name;
1141}
1142
1143# Internal Method
1144sub post_delete_node {
1145        my ($name,$node_id,$version,$plugins) = @_;
1146
1147    # Call post_delete on any plugins, having done the delete
1148    my @plugins = @{ $plugins || [ ] };
1149    foreach my $plugin (@plugins) {
1150        if ( $plugin->can( "post_delete" ) ) {
1151            $plugin->post_delete( 
1152                                node     => $name,
1153                                node_id  => $node_id,
1154                                version  => $version );
1155        }
1156    }
1157}
1158
1159=item B<list_recent_changes>
1160
1161  # Nodes changed in last 7 days - each node listed only once.
1162  my @nodes = $store->list_recent_changes( days => 7 );
1163
1164  # Nodes added in the last 7 days.
1165  my @nodes = $store->list_recent_changes(
1166                                           days     => 7,
1167                                           new_only => 1,
1168                                         );
1169
1170  # All changes in last 7 days - nodes changed more than once will
1171  # be listed more than once.
1172  my @nodes = $store->list_recent_changes(
1173                                           days => 7,
1174                                           include_all_changes => 1,
1175                                         );
1176
1177  # Nodes changed between 1 and 7 days ago.
1178  my @nodes = $store->list_recent_changes( between_days => [ 1, 7 ] );
1179
1180  # Nodes changed since a given time.
1181  my @nodes = $store->list_recent_changes( since => 1036235131 );
1182
1183  # Most recent change and its details.
1184  my @nodes = $store->list_recent_changes( last_n_changes => 1 );
1185  print "Node:          $nodes[0]{name}";
1186  print "Last modified: $nodes[0]{last_modified}";
1187  print "Comment:       $nodes[0]{metadata}{comment}";
1188
1189  # Last 5 restaurant nodes edited.
1190  my @nodes = $store->list_recent_changes(
1191      last_n_changes => 5,
1192      metadata_is    => { category => "Restaurants" }
1193  );
1194
1195  # Last 5 nodes edited by Kake.
1196  my @nodes = $store->list_recent_changes(
1197      last_n_changes => 5,
1198      metadata_was   => { username => "Kake" }
1199  );
1200
1201  # All minor edits made by Earle in the last week.
1202  my @nodes = $store->list_recent_changes(
1203      days           => 7,
1204      metadata_was   => { username  => "Earle",
1205                          edit_type => "Minor tidying." }
1206  );
1207
1208  # Last 10 changes that weren't minor edits.
1209  my @nodes = $store->list_recent_changes(
1210      last_n_changes => 10,
1211      metadata_wasnt  => { edit_type => "Minor tidying" }
1212  );
1213
1214You I<must> supply one of the following constraints: C<days>
1215(integer), C<since> (epoch), C<last_n_changes> (integer).
1216
1217You I<may> also supply moderation => 1 if you only want to see versions
1218that are moderated.
1219
1220Another optional parameter is C<new_only>, which if set to 1 will only
1221return newly added nodes.
1222
1223You I<may> also supply I<either> C<metadata_is> (and optionally
1224C<metadata_isnt>), I<or> C<metadata_was> (and optionally
1225C<metadata_wasnt>). Each of these should be a ref to a hash with
1226scalar keys and values.  If the hash has more than one entry, then
1227only changes satisfying I<all> criteria will be returned when using
1228C<metadata_is> or C<metadata_was>, but all changes which fail to
1229satisfy any one of the criteria will be returned when using
1230C<metadata_isnt> or C<metadata_is>.
1231
1232C<metadata_is> and C<metadata_isnt> look only at the metadata that the
1233node I<currently> has. C<metadata_was> and C<metadata_wasnt> take into
1234account the metadata of previous versions of a node.  Don't mix C<is>
1235with C<was> - there's no check for this, but the results are undefined.
1236
1237Returns results as an array, in reverse chronological order.  Each
1238element of the array is a reference to a hash with the following entries:
1239
1240=over 4
1241
1242=item * B<name>: the name of the node
1243
1244=item * B<version>: the latest version number
1245
1246=item * B<last_modified>: the timestamp of when it was last modified
1247
1248=item * B<metadata>: a ref to a hash containing any metadata attached
1249to the current version of the node
1250
1251=back
1252
1253Unless you supply C<include_all_changes>, C<metadata_was> or
1254C<metadata_wasnt>, each node will only be returned once regardless of
1255how many times it has been changed recently.
1256
1257By default, the case-sensitivity of both C<metadata_type> and
1258C<metadata_value> depends on your database - if it will return rows
1259with an attribute value of "Pubs" when you asked for "pubs", or not.
1260If you supply a true value to the C<ignore_case> parameter, then you
1261can be sure of its being case-insensitive.  This is recommended.
1262
1263=cut
1264
1265sub list_recent_changes {
1266    my $self = shift;
1267    my %args = @_;
1268    if ($args{since}) {
1269        return $self->_find_recent_changes_by_criteria( %args );
1270    } elsif ($args{between_days}) {
1271        return $self->_find_recent_changes_by_criteria( %args );
1272    } elsif ( $args{days} ) {
1273        my $now = localtime;
1274        my $then = $now - ( ONE_DAY * $args{days} );
1275        $args{since} = $then;
1276        delete $args{days};
1277        return $self->_find_recent_changes_by_criteria( %args );
1278    } elsif ( $args{last_n_changes} ) {
1279        $args{limit} = delete $args{last_n_changes};
1280        return $self->_find_recent_changes_by_criteria( %args );
1281    } else {
1282        croak "Need to supply some criteria to list_recent_changes.";
1283    }
1284}
1285
1286sub _find_recent_changes_by_criteria {
1287    my ($self, %args) = @_;
1288    my ($since, $limit, $between_days, $ignore_case, $new_only,
1289        $metadata_is,  $metadata_isnt, $metadata_was, $metadata_wasnt,
1290        $moderation, $include_all_changes ) =
1291         @args{ qw( since limit between_days ignore_case new_only
1292                    metadata_is metadata_isnt metadata_was metadata_wasnt
1293                    moderation include_all_changes) };
1294    my $dbh = $self->dbh;
1295
1296    my @where;
1297    my @metadata_joins;
1298    my $use_content_table; # some queries won't need this
1299
1300    if ( $metadata_is ) {
1301        my $main_table = "node";
1302        if ( $include_all_changes ) {
1303            $main_table = "content";
1304            $use_content_table = 1;
1305        }
1306        my $i = 0;
1307        foreach my $type ( keys %$metadata_is ) {
1308            $i++;
1309            my $value  = $metadata_is->{$type};
1310            croak "metadata_is must have scalar values" if ref $value;
1311            my $mdt = "md_is_$i";
1312            push @metadata_joins, "LEFT JOIN metadata AS $mdt
1313                                   ON $main_table."
1314                                   . ( ($main_table eq "node") ? "id"
1315                                                               : "node_id" )
1316                                   . "=$mdt.node_id
1317                                   AND $main_table.version=$mdt.version\n";
1318            # Why is this inside 'if ( $metadata_is )'?
1319            # Shouldn't it apply to all cases?
1320            # What's it doing in @metadata_joins?
1321            if (defined $moderation) {
1322                push @metadata_joins, "AND $main_table.moderate=$moderation";
1323            }
1324            push @where, "( "
1325                         . $self->_get_comparison_sql(
1326                                          thing1      => "$mdt.metadata_type",
1327                                          thing2      => $dbh->quote($type),
1328                                          ignore_case => $ignore_case,
1329                                                     )
1330                         . " AND "
1331                         . $self->_get_comparison_sql(
1332                                          thing1      => "$mdt.metadata_value",
1333                                          thing2      => $dbh->quote( $self->charset_encode($value) ),
1334                                          Ignore_case => $ignore_case,
1335                                                     )
1336                         . " )";
1337        }
1338    }
1339
1340    if ( $metadata_isnt ) {
1341        foreach my $type ( keys %$metadata_isnt ) {
1342            my $value  = $metadata_isnt->{$type};
1343            croak "metadata_isnt must have scalar values" if ref $value;
1344        }
1345        my @omits = $self->_find_recent_changes_by_criteria(
1346            since        => $since,
1347            between_days => $between_days,
1348            metadata_is  => $metadata_isnt,
1349            ignore_case  => $ignore_case,
1350        );
1351        foreach my $omit ( @omits ) {
1352            push @where, "( node.name != " . $dbh->quote($omit->{name})
1353                 . "  OR node.version != " . $dbh->quote($omit->{version})
1354                 . ")";
1355        }
1356    }
1357
1358    if ( $metadata_was ) {
1359        $use_content_table = 1;
1360        my $i = 0;
1361        foreach my $type ( keys %$metadata_was ) {
1362            $i++;
1363            my $value  = $metadata_was->{$type};
1364            croak "metadata_was must have scalar values" if ref $value;
1365            my $mdt = "md_was_$i";
1366            push @metadata_joins, "LEFT JOIN metadata AS $mdt
1367                                   ON content.node_id=$mdt.node_id
1368                                   AND content.version=$mdt.version\n";
1369            push @where, "( "
1370                         . $self->_get_comparison_sql(
1371                                          thing1      => "$mdt.metadata_type",
1372                                          thing2      => $dbh->quote($type),
1373                                          ignore_case => $ignore_case,
1374                                                     )
1375                         . " AND "
1376                         . $self->_get_comparison_sql(
1377                                          thing1      => "$mdt.metadata_value",
1378                                          thing2      => $dbh->quote( $self->charset_encode($value) ),
1379                                          ignore_case => $ignore_case,
1380                                                     )
1381                         . " )";
1382        }
1383    }
1384
1385    if ( $metadata_wasnt ) {
1386        foreach my $type ( keys %$metadata_wasnt ) {
1387                my $value  = $metadata_was->{$type};
1388                croak "metadata_was must have scalar values" if ref $value;
1389        }
1390        my @omits = $self->_find_recent_changes_by_criteria(
1391                since        => $since,
1392                between_days => $between_days,
1393                metadata_was => $metadata_wasnt,
1394                ignore_case  => $ignore_case,
1395        );
1396        foreach my $omit ( @omits ) {
1397            push @where, "( node.name != " . $dbh->quote($omit->{name})
1398                 . "  OR content.version != " . $dbh->quote($omit->{version})
1399                 . ")";
1400        }
1401        $use_content_table = 1;
1402    }
1403
1404    # Figure out which table we should be joining to to check the dates and
1405    # versions - node or content.
1406    my $date_table = "node";
1407    if ( $include_all_changes || $new_only ) {
1408        $date_table = "content";
1409        $use_content_table = 1;
1410    }
1411    if ( $new_only ) {
1412        push @where, "content.version=1";
1413    }
1414
1415    if ( $since ) {
1416        my $timestamp = $self->_get_timestamp( $since );
1417        push @where, "$date_table.modified >= " . $dbh->quote($timestamp);
1418    } elsif ( $between_days ) {
1419        my $now = localtime;
1420        # Start is the larger number of days ago.
1421        my ($start, $end) = @$between_days;
1422        ($start, $end) = ($end, $start) if $start < $end;
1423        my $ts_start = $self->_get_timestamp( $now - (ONE_DAY * $start) ); 
1424        my $ts_end = $self->_get_timestamp( $now - (ONE_DAY * $end) ); 
1425        push @where, "$date_table.modified >= " . $dbh->quote($ts_start);
1426        push @where, "$date_table.modified <= " . $dbh->quote($ts_end);
1427    }
1428
1429    my $sql = "SELECT DISTINCT
1430                               node.name,
1431              ";
1432    if ( $include_all_changes || $new_only || $use_content_table ) {
1433        $sql .= " content.version, content.modified ";
1434    } else {
1435        $sql .= " node.version, node.modified ";
1436    }
1437    $sql .= " FROM node ";
1438    if ( $use_content_table ) {
1439        $sql .= " INNER JOIN content ON (node.id = content.node_id ) ";
1440    }
1441
1442    $sql .= join("\n", @metadata_joins)
1443            . (
1444                scalar @where
1445                              ? " WHERE " . join(" AND ",@where) 
1446                              : ""
1447              )
1448            . " ORDER BY "
1449            . ( $use_content_table ? "content" : "node" )
1450            . ".modified DESC";
1451    if ( $limit ) {
1452        croak "Bad argument $limit" unless $limit =~ /^\d+$/;
1453        $sql .= " LIMIT $limit";
1454    }
1455#print "\n\n$sql\n\n";
1456    my $nodesref = $dbh->selectall_arrayref($sql);
1457    my @finds = map { { name          => $_->[0],
1458                        version       => $_->[1],
1459                        last_modified => $_->[2] }
1460                    } @$nodesref;
1461    foreach my $find ( @finds ) {
1462        my %metadata;
1463        my $sth = $dbh->prepare( "SELECT metadata_type, metadata_value
1464                                  FROM node
1465                                  INNER JOIN metadata
1466                                        ON (id = node_id)
1467                                  WHERE name=?
1468                                  AND metadata.version=?" );
1469        $sth->execute( $find->{name}, $find->{version} );
1470        while ( my ($type, $value) = $self->charset_decode( $sth->fetchrow_array ) ) {
1471            if ( defined $metadata{$type} ) {
1472                push @{$metadata{$type}}, $value;
1473            } else {
1474                $metadata{$type} = [ $value ];
1475            }
1476        }
1477        $find->{metadata} = \%metadata;
1478    }
1479    return @finds;
1480}
1481
1482=item B<list_all_nodes>
1483
1484  my @nodes = $store->list_all_nodes();
1485  print "First node is $nodes[0]\n";
1486
1487  my @nodes = $store->list_all_nodes( with_details=> 1 );
1488  print "First node is ".$nodes[0]->{'name'}." at version ".$nodes[0]->{'version'}."\n";
1489
1490Returns a list containing the name of every existing node.  The list
1491won't be in any kind of order; do any sorting in your calling script.
1492
1493Optionally also returns the id, version and moderation flag.
1494
1495=cut
1496
1497sub list_all_nodes {
1498    my ($self,%args) = @_;
1499    my $dbh = $self->dbh;
1500        my @nodes;
1501
1502        if($args{with_details}) {
1503                my $sql = "SELECT id, name, version, moderate FROM node;";
1504                my $sth = $dbh->prepare( $sql );
1505                $sth->execute();
1506
1507                while(my @results = $sth->fetchrow_array) {
1508                        my %data;
1509                        @data{ qw( node_id name version moderate ) } = @results;
1510                        push @nodes, \%data;
1511                }
1512        } else {
1513                my $sql = "SELECT name FROM node;";
1514                my $raw_nodes = $dbh->selectall_arrayref($sql); 
1515                @nodes = ( map { $self->charset_decode( $_->[0] ) } (@$raw_nodes) );
1516        }
1517        return @nodes;
1518}
1519
1520=item B<list_node_all_versions>
1521
1522  my @all_versions = $store->list_node_all_versions(
1523      name => 'HomePage',
1524      with_content => 1,
1525      with_metadata => 0
1526  );
1527
1528Returns all the versions of a node, optionally including the content
1529and metadata, as an array of hashes (newest versions first).
1530
1531=cut
1532
1533sub list_node_all_versions {
1534    my ($self, %args) = @_;
1535
1536    my ($node_id,$name,$with_content,$with_metadata) = 
1537                @args{ qw( node_id name with_content with_metadata ) };
1538
1539    my $dbh = $self->dbh;
1540    my $sql;
1541
1542    # If they only gave us the node name, get the node id
1543    unless ($node_id) {
1544        $sql = "SELECT id FROM node WHERE name=" . $dbh->quote($name);
1545        $node_id = $dbh->selectrow_array($sql);
1546    }
1547
1548    # If they didn't tell us what they wanted / we couldn't find it,
1549    #  return an empty array
1550    return () unless($node_id);
1551
1552    # Build up our SQL
1553    $sql = "SELECT id, name, content.version, content.modified ";
1554    if ( $with_content ) {
1555        $sql .= ", content.text ";
1556    }
1557    if ( $with_metadata ) {
1558        $sql .= ", metadata_type, metadata_value ";
1559    }
1560    $sql .= " FROM node INNER JOIN content ON (id = content.node_id) ";
1561    if ( $with_metadata ) {
1562        $sql .= " LEFT OUTER JOIN metadata ON "
1563           . "(id = metadata.node_id AND content.version = metadata.version) ";
1564    }
1565    $sql .= " WHERE id = ? ORDER BY content.version DESC";
1566
1567    # Do the fetch
1568    my $sth = $dbh->prepare( $sql );
1569    $sth->execute( $node_id );
1570
1571    # Need to hold onto the last row by hash ref, so we don't trash
1572    #  it every time
1573    my %first_data;
1574    my $dataref = \%first_data;
1575
1576    # Haul out the data
1577    my @versions;
1578    while ( my @results = $sth->fetchrow_array ) {
1579        my %data = %$dataref;
1580
1581        # Is it the same version as last time?
1582        if ( %data && $data{'version'} != $results[2] ) {
1583            # New version
1584            push @versions, $dataref;
1585            %data = ();
1586        } else {
1587            # Same version as last time, must be more metadata
1588        }
1589
1590        # Grab the core data (will be the same on multi-row for metadata)
1591        @data{ qw( node_id name version last_modified ) } = @results;
1592
1593        my $i = 4;
1594        if ( $with_content ) {
1595            $data{'content'} = $results[$i];
1596            $i++;
1597        }
1598        if ( $with_metadata ) {
1599            my ($m_type,$m_value) = @results[$i,($i+1)];
1600            unless ( $data{'metadata'} ) { $data{'metadata'} = {}; }
1601
1602            if ( $m_type ) {
1603                # If we have existing data, then put it into an array
1604                if ( $data{'metadata'}->{$m_type} ) {
1605                    unless ( ref($data{'metadata'}->{$m_type}) eq "ARRAY" ) {
1606                        $data{'metadata'}->{$m_type} =
1607                                             [ $data{'metadata'}->{$m_type} ];
1608                    }
1609                    push @{$data{'metadata'}->{$m_type}}, $m_value;
1610                } else {
1611                    # Otherwise, just store it in a normal string
1612                    $data{'metadata'}->{$m_type} = $m_value;
1613                }
1614            }
1615        }
1616
1617        # Save where we've got to
1618        $dataref = \%data;
1619    }
1620
1621    # Handle final row saving
1622    if ( $dataref ) {
1623        push @versions, $dataref;
1624    }
1625
1626    # Return
1627    return @versions;
1628}
1629
1630=item B<list_nodes_by_metadata>
1631
1632  # All documentation nodes.
1633  my @nodes = $store->list_nodes_by_metadata(
1634      metadata_type  => "category",
1635      metadata_value => "documentation",
1636      ignore_case    => 1,   # optional but recommended (see below)
1637  );
1638
1639  # All pubs in Hammersmith.
1640  my @pubs = $store->list_nodes_by_metadata(
1641      metadata_type  => "category",
1642      metadata_value => "Pub",
1643  );
1644  my @hsm  = $store->list_nodes_by_metadata(
1645      metadata_type  => "category",
1646      metadata_value  => "Hammersmith",
1647  );
1648  my @results = my_l33t_method_for_ANDing_arrays( \@pubs, \@hsm );
1649
1650Returns a list containing the name of every node whose caller-supplied
1651metadata matches the criteria given in the parameters.
1652
1653By default, the case-sensitivity of both C<metadata_type> and
1654C<metadata_value> depends on your database - if it will return rows
1655with an attribute value of "Pubs" when you asked for "pubs", or not.
1656If you supply a true value to the C<ignore_case> parameter, then you
1657can be sure of its being case-insensitive.  This is recommended.
1658
1659If you don't supply any criteria then you'll get an empty list.
1660
1661This is a really really really simple way of finding things; if you
1662want to be more complicated then you'll need to call the method
1663multiple times and combine the results yourself, or write a plugin.
1664
1665=cut
1666
1667sub list_nodes_by_metadata {
1668    my ($self, %args) = @_;
1669    my ( $type, $value ) = @args{ qw( metadata_type metadata_value ) };
1670    return () unless $type;
1671
1672    my $dbh = $self->dbh;
1673    if ( $args{ignore_case} ) {
1674        $type  = lc( $type  );
1675        $value = lc( $value );
1676    }
1677    my $sql =
1678         $self->_get_list_by_metadata_sql( ignore_case => $args{ignore_case} );
1679    my $sth = $dbh->prepare( $sql );
1680    $sth->execute( $type, $self->charset_encode($value) );
1681    my @nodes;
1682    while ( my ($id, $node) = $sth->fetchrow_array ) {
1683        push @nodes, $node;
1684    }
1685    return @nodes;
1686}
1687
1688=item B<list_nodes_by_missing_metadata>
1689Returns nodes where either the metadata doesn't exist, or is blank
1690
1691Unlike list_nodes_by_metadata(), the metadata value is optional.
1692
1693  # All nodes missing documentation
1694  my @nodes = $store->list_nodes_by_missing_metadata(
1695      metadata_type  => "category",
1696      metadata_value => "documentation",
1697      ignore_case    => 1,   # optional but recommended (see below)
1698  );
1699
1700  # All nodes which don't have a latitude defined
1701  my @nodes = $store->list_nodes_by_missing_metadata(
1702      metadata_type  => "latitude"
1703  );
1704
1705=cut
1706
1707sub list_nodes_by_missing_metadata {
1708    my ($self, %args) = @_;
1709    my ( $type, $value ) = @args{ qw( metadata_type metadata_value ) };
1710    return () unless $type;
1711
1712    my $dbh = $self->dbh;
1713    if ( $args{ignore_case} ) {
1714        $type  = lc( $type  );
1715        $value = lc( $value );
1716    }
1717
1718        my @nodes;
1719
1720        # If the don't want to match by value, then we can do it with
1721        #  a LEFT OUTER JOIN, and either NULL or LENGTH() = 0
1722        if( ! $value ) {
1723                my $sql = $self->_get_list_by_missing_metadata_sql( 
1724                                                                                ignore_case => $args{ignore_case}
1725                      );
1726                my $sth = $dbh->prepare( $sql );
1727                $sth->execute( $type );
1728
1729                while ( my ($id, $node) = $sth->fetchrow_array ) {
1730                push @nodes, $node;
1731                }
1732    } else {
1733                # To find those without the value in this case would involve
1734                #  some seriously brain hurting SQL.
1735                # So, cheat - find those with, and return everything else
1736                my @with = $self->list_nodes_by_metadata(%args);
1737                my %with_hash;
1738                foreach my $node (@with) { $with_hash{$node} = 1; }
1739
1740                my @all_nodes = $self->list_all_nodes();
1741                foreach my $node (@all_nodes) {
1742                        unless($with_hash{$node}) {
1743                                push @nodes, $node;
1744                        }
1745                }
1746        }
1747
1748    return @nodes;
1749}
1750
1751=item B<_get_list_by_metadata_sql>
1752
1753Return the SQL to do a match by metadata. Should expect the metadata type
1754as the first SQL parameter, and the metadata value as the second.
1755
1756If possible, should take account of $args{ignore_case}
1757
1758=cut
1759
1760sub _get_list_by_metadata_sql {
1761        # SQL 99 version
1762    #  Can be over-ridden by database-specific subclasses
1763    my ($self, %args) = @_;
1764    if ( $args{ignore_case} ) {
1765        return "SELECT node.id, node.name "
1766             . "FROM node "
1767             . "INNER JOIN metadata "
1768             . "   ON (node.id = metadata.node_id "
1769             . "       AND node.version=metadata.version) "
1770             . "WHERE ". $self->_get_lowercase_compare_sql("metadata.metadata_type")
1771             . " AND ". $self->_get_lowercase_compare_sql("metadata.metadata_value");
1772    } else {
1773        return "SELECT node.id, node.name "
1774             . "FROM node "
1775             . "INNER JOIN metadata "
1776             . "   ON (node.id = metadata.node_id "
1777             . "       AND node.version=metadata.version) "
1778             . "WHERE ". $self->_get_casesensitive_compare_sql("metadata.metadata_type")
1779             . " AND ". $self->_get_casesensitive_compare_sql("metadata.metadata_value");
1780    }
1781}
1782
1783=item B<_get_list_by_missing_metadata_sql>
1784
1785Return the SQL to do a match by missing metadata. Should expect the metadata
1786type as the first SQL parameter.
1787
1788If possible, should take account of $args{ignore_case}
1789
1790=cut
1791
1792sub _get_list_by_missing_metadata_sql {
1793        # SQL 99 version
1794    #  Can be over-ridden by database-specific subclasses
1795    my ($self, %args) = @_;
1796
1797        my $sql = "";
1798    if ( $args{ignore_case} ) {
1799        $sql = "SELECT node.id, node.name "
1800             . "FROM node "
1801             . "LEFT OUTER JOIN metadata "
1802             . "   ON (node.id = metadata.node_id "
1803             . "       AND node.version=metadata.version "
1804             . "       AND ". $self->_get_lowercase_compare_sql("metadata.metadata_type")
1805                         . ")";
1806    } else {
1807        $sql = "SELECT node.id, node.name "
1808             . "FROM node "
1809             . "LEFT OUTER JOIN metadata "
1810             . "   ON (node.id = metadata.node_id "
1811             . "       AND node.version=metadata.version "
1812             . "       AND ". $self->_get_casesensitive_compare_sql("metadata.metadata_type")
1813             . ")";
1814    }
1815
1816        $sql .= "WHERE (metadata.metadata_value IS NULL OR LENGTH(metadata.metadata_value) = 0) ";
1817        return $sql;
1818}
1819
1820sub _get_lowercase_compare_sql {
1821        my ($self, $column) = @_;
1822        # SQL 99 version
1823    #  Can be over-ridden by database-specific subclasses
1824        return "lower($column) = ?";
1825}
1826sub _get_casesensitive_compare_sql {
1827        my ($self, $column) = @_;
1828        # SQL 99 version
1829    #  Can be over-ridden by database-specific subclasses
1830        return "$column = ?";
1831}
1832
1833sub _get_comparison_sql {
1834    my ($self, %args) = @_;
1835        # SQL 99 version
1836    #  Can be over-ridden by database-specific subclasses
1837    return "$args{thing1} = $args{thing2}";
1838}
1839
1840sub _get_node_exists_ignore_case_sql {
1841        # SQL 99 version
1842    #  Can be over-ridden by database-specific subclasses
1843    return "SELECT name FROM node WHERE name = ? ";
1844}
1845
1846=item B<list_unmoderated_nodes>
1847
1848  my @nodes = $wiki->list_unmoderated_nodes();
1849  my @nodes = $wiki->list_unmoderated_nodes(
1850                                                only_where_latest => 1
1851                                            );
1852
1853  $nodes[0]->{'name'}              # The name of the node
1854  $nodes[0]->{'node_id'}           # The id of the node
1855  $nodes[0]->{'version'}           # The version in need of moderation
1856  $nodes[0]->{'moderated_version'} # The newest moderated version
1857
1858  With only_where_latest set, return the id, name and version of all the
1859   nodes where the most recent version needs moderation.
1860  Otherwise, returns the id, name and version of all node versions that need
1861   to be moderated.
1862
1863=cut
1864
1865sub list_unmoderated_nodes {
1866        my ($self,%args) = @_;
1867
1868        my $only_where_lastest = $args{'only_where_latest'};
1869
1870        my $sql =
1871                 "SELECT "
1872                ."      id, name, "
1873                ."      node.version AS last_moderated_version, "
1874                ."      content.version AS version "
1875                ."FROM content "
1876                ."INNER JOIN node "
1877                ."      ON (id = node_id) "
1878                ."WHERE moderated = ? "
1879        ;
1880        if($only_where_lastest) {
1881                $sql .= "AND node.version = content.version ";
1882        }
1883        $sql .= "ORDER BY name, content.version ";
1884
1885        # Query
1886    my $dbh = $self->dbh;
1887    my $sth = $dbh->prepare( $sql );
1888    $sth->execute( "0" );
1889
1890        my @nodes;
1891        while(my @results = $sth->fetchrow_array) {
1892                my %data;
1893                @data{ qw( node_id name moderated_version version ) } = @results;
1894                push @nodes, \%data;
1895        }
1896
1897        return @nodes;
1898}
1899
1900=item B<list_last_version_before>
1901
1902    List the last version of every node before a given date.
1903    If no version existed before that date, will return undef for version.
1904    Returns a hash of id, name, version and date
1905
1906    my @nv = $wiki->list_last_version_before('2007-01-02 10:34:11')
1907    foreach my $data (@nv) {
1908       
1909    }
1910
1911=cut
1912
1913sub list_last_version_before {
1914        my ($self, $date) = @_;
1915
1916        my $sql =
1917                 "SELECT "
1918                ."      id, name, "
1919                ."MAX(content.version) AS version, MAX(content.modified) AS modified "
1920                ."FROM node "
1921                ."LEFT OUTER JOIN content "
1922                ."      ON (id = node_id "
1923                ."      AND content.modified <= ?) "
1924                ."GROUP BY id, name "
1925                ."ORDER BY id "
1926        ;
1927
1928        # Query
1929    my $dbh = $self->dbh;
1930    my $sth = $dbh->prepare( $sql );
1931    $sth->execute( $date );
1932
1933        my @nodes;
1934        while(my @results = $sth->fetchrow_array) {
1935                my %data;
1936                @data{ qw( id name version modified ) } = @results;
1937                $data{'node_id'} = $data{'id'};
1938                unless($data{'version'}) { $data{'version'} = undef; }
1939                push @nodes, \%data;
1940        }
1941
1942        return @nodes;
1943}
1944
1945=item B<list_metadata_by_type>
1946
1947    List all the currently defined values of the given type of metadata.
1948
1949    Will only work with the latest moderated version
1950
1951    # List all of the different metadata values with the type 'category'
1952    my @categories = $wiki->list_metadata_by_type('category');
1953
1954=cut
1955
1956sub list_metadata_by_type {
1957        my ($self, $type) = @_;
1958
1959        return 0 unless $type;
1960}
1961
1962
1963=item B<schema_current>
1964
1965  my ($code_version, $db_version) = $store->schema_current;
1966  if ($code_version == $db_version)
1967      # Do stuff
1968  } else {
1969      # Bail
1970  }
1971
1972=cut
1973
1974sub schema_current {
1975    my $self = shift;
1976    my $dbh = $self->dbh;
1977    my $sth;
1978    eval { $sth = $dbh->prepare("SELECT version FROM schema_info") };
1979    if ($@) {
1980        return ($SCHEMA_VER, 0);
1981    }
1982    eval { $sth->execute };
1983    if ($@) {
1984        return ($SCHEMA_VER, 0);
1985    }
1986    my $version;
1987    eval { $version = $sth->fetchrow_array };
1988    if ($@) {
1989        return ($SCHEMA_VER, 0);
1990    } else {
1991        return ($SCHEMA_VER, $version);
1992    }
1993}
1994
1995
1996=item B<dbh>
1997
1998  my $dbh = $store->dbh;
1999
2000Returns the database handle belonging to this storage backend instance.
2001
2002=cut
2003
2004sub dbh {
2005    my $self = shift;
2006    return $self->{_dbh};
2007}
2008
2009=item B<dbname>
2010
2011  my $dbname = $store->dbname;
2012
2013Returns the name of the database used for backend storage.
2014
2015=cut
2016
2017sub dbname {
2018    my $self = shift;
2019    return $self->{_dbname};
2020}
2021
2022=item B<dbuser>
2023
2024  my $dbuser = $store->dbuser;
2025
2026Returns the username used to connect to the database used for backend storage.
2027
2028=cut
2029
2030sub dbuser {
2031    my $self = shift;
2032    return $self->{_dbuser};
2033}
2034
2035=item B<dbpass>
2036
2037  my $dbpass = $store->dbpass;
2038
2039Returns the password used to connect to the database used for backend storage.
2040
2041=cut
2042
2043sub dbpass {
2044    my $self = shift;
2045    return $self->{_dbpass};
2046}
2047
2048=item B<dbhost>
2049
2050  my $dbhost = $store->dbhost;
2051
2052Returns the optional host used to connect to the database used for
2053backend storage.
2054
2055=cut
2056
2057sub dbhost {
2058    my $self = shift;
2059    return $self->{_dbhost};
2060}
2061
2062# Cleanup.
2063sub DESTROY {
2064    my $self = shift;
2065    return if $self->{_external_dbh};
2066    my $dbh = $self->dbh;
2067    $dbh->disconnect if $dbh;
2068}
2069
2070# decode a string of octets into perl's internal encoding, based on the
2071# charset parameter we were passed. Takes a list, returns a list.
2072sub charset_decode {
2073  my $self = shift;
2074  my @input = @_;
2075  if ($CAN_USE_ENCODE) {
2076    my @output;
2077    for (@input) {
2078      push( @output, Encode::decode( $self->{_charset}, $_ ) );
2079    }
2080    return @output;
2081  }
2082  return @input;
2083}
2084
2085# convert a perl string into a series of octets we can put into the database
2086# takes a list, returns a list
2087sub charset_encode {
2088  my $self = shift;
2089  my @input = @_;
2090  if ($CAN_USE_ENCODE) {
2091    my @output;
2092    for (@input) {
2093      push( @output, Encode::encode( $self->{_charset}, $_ ) );
2094    }
2095    return @output;
2096  }
2097  return @input;
2098}
2099
21001;
Note: See TracBrowser for help on using the repository browser.