1 module msgpack.streaming_unpacker; 2 3 import msgpack.common; 4 import msgpack.attribute; 5 import msgpack.exception; 6 import msgpack.value; 7 8 import std.array; 9 import std.exception; 10 import std.range; 11 import std.stdio; 12 import std.traits; 13 import std.typecons; 14 import std.typetuple; 15 import std.container; 16 17 18 /** 19 * $(D Unpacked) is a $(D Range) wrapper for stream deserialization result 20 */ 21 struct Unpacked 22 { 23 import std.conv : text; 24 25 Value value; /// deserialized value 26 27 alias value this; 28 29 30 /** 31 * Constructs a $(D Unpacked) with argument. 32 * 33 * Params: 34 * value = a deserialized value. 35 */ 36 @safe 37 this(ref Value value) 38 { 39 this.value = value; 40 } 41 42 43 /** 44 * InputRange primitive operation that checks iteration state. 45 * 46 * Returns: 47 * true if there are no more elements to be iterated. 48 */ 49 @property @trusted 50 nothrow bool empty() const // std.array.empty isn't nothrow function 51 { 52 return (value.type == Value.Type.array) && !value.via.array.length; 53 } 54 55 56 /** 57 * Range primitive operation that returns the length of the range. 58 * 59 * Returns: 60 * the number of values. 61 */ 62 @property @trusted 63 size_t length() 64 { 65 debug enforce(value.type == Value.Type.array, "lenght is called with non array object. type = " ~ text(value.type)); 66 return value.via.array.length; 67 } 68 69 70 /** 71 * InputRange primitive operation that returns the currently iterated element. 72 * 73 * Returns: 74 * the deserialized $(D Value). 75 */ 76 @property @trusted 77 ref Value front() 78 { 79 debug enforce(value.type == Value.Type.array, "front is called with non array object. type = " ~ text(value.type)); 80 return value.via.array.front; 81 } 82 83 84 /** 85 * InputRange primitive operation that advances the range to its next element. 86 */ 87 @trusted 88 void popFront() 89 { 90 debug enforce(value.type == Value.Type.array, "popFront is called with non array object. type = " ~ text(value.type)); 91 value.via.array.popFront(); 92 } 93 94 /** 95 * RandomAccessRange primitive operation. 96 * 97 * Returns: 98 * the deserialized $(D Value) at $(D_PARAM n) position. 99 */ 100 @trusted 101 ref Value opIndex(size_t n) 102 { 103 debug enforce(value.type == Value.Type.array, "opIndex is called with non array object. type = " ~ text(value.type)); 104 return value.via.array[n]; 105 } 106 107 /** 108 * Returns a slice of the range. 109 * 110 * Paramas: 111 * from = the start point of slicing. 112 * to = the end point of slicing. 113 * 114 * Returns: 115 * the slice of Values. 116 */ 117 @trusted 118 Value[] opSlice(size_t from, size_t to) 119 { 120 debug enforce(value.type == Value.Type.array, "opSlice is called with non array object. type = " ~ text(value.type)); 121 return value.via.array[from..to]; 122 } 123 124 /** 125 * Range primitive operation that returns the snapshot. 126 * 127 * Returns: 128 * the snapshot of this Value. 129 */ 130 @property @safe 131 Unpacked save() 132 { 133 return Unpacked(value); 134 } 135 } 136 137 138 unittest 139 { 140 static assert(isForwardRange!Unpacked); 141 static assert(hasLength!Unpacked); 142 } 143 144 145 /** 146 * This $(D StreamingUnpacker) is a $(D MessagePack) streaming deserializer 147 * 148 * This implementation enables you to load multiple objects from a stream(like network). 149 * 150 * Example: 151 * ----- 152 * ... 153 * auto unpacker = StreamingUnpacker(serializedData); 154 * ... 155 * 156 * // appends new data to buffer if pre execute() call didn't finish deserialization. 157 * unpacker.feed(newSerializedData); 158 * 159 * while (unpacker.execute()) { 160 * foreach (obj; unpacker.purge()) { 161 * // do stuff (obj is a Value) 162 * } 163 * } 164 * 165 * if (unpacker.size) 166 * throw new Exception("Message is too large"); 167 * ----- 168 */ 169 struct StreamingUnpacker 170 { 171 private: 172 /* 173 * Context state of deserialization 174 */ 175 enum State 176 { 177 HEADER = 0x00, 178 179 BIN8 = 0x04, 180 BIN16, 181 BIN32, 182 183 // Floating point, Unsigned, Signed interger (== header & 0x03) 184 FLOAT = 0x0a, 185 DOUBLE, 186 UINT8, 187 UINT16, 188 UINT32, 189 UINT64, 190 INT8, 191 INT16, 192 INT32, 193 INT64, 194 195 // Container (== header & 0x01) 196 STR8 = 0x19, 197 RAW16 = 0x1a, 198 RAW32, 199 ARRAY16, 200 ARRAY36, 201 MAP16, 202 MAP32, 203 RAW, 204 205 // EXT family 206 EXT8, 207 EXT16, 208 EXT32, 209 EXT_DATA, 210 211 // D-specific type 212 REAL 213 } 214 215 216 /* 217 * Element type of container 218 */ 219 enum ContainerElement 220 { 221 ARRAY_ITEM, 222 MAP_KEY, 223 MAP_VALUE 224 } 225 226 227 /* 228 * Internal stack context 229 */ 230 static struct Context 231 { 232 static struct Container 233 { 234 ContainerElement type; // value container type 235 Value value; // current value 236 Value key; // for map value 237 size_t count; // container length 238 } 239 240 State state; // current state of deserialization 241 size_t trail; // current deserializing size 242 size_t top; // current index of stack 243 Container[] stack; // storing values 244 } 245 246 Context context_; // stack environment for streaming deserialization 247 248 mixin InternalBuffer; 249 250 251 public: 252 /** 253 * Constructs a $(D StreamingUnpacker). 254 * 255 * Params: 256 * target = byte buffer to deserialize 257 * bufferSize = size limit of buffer size 258 */ 259 @safe 260 this(in ubyte[] target, in size_t bufferSize = 8192) 261 { 262 initializeBuffer(target, bufferSize); 263 initializeContext(); 264 } 265 266 267 /** 268 * Forwards to deserialized object. 269 * 270 * Returns: 271 * the $(D Unpacked) object contains deserialized value. 272 */ 273 @property @safe 274 Unpacked unpacked() 275 { 276 return Unpacked(context_.stack[0].value); 277 } 278 279 280 /** 281 * Clears some states for next deserialization. 282 */ 283 @safe 284 nothrow void clear() 285 { 286 initializeContext(); 287 288 parsed_ = 0; 289 } 290 291 292 /** 293 * Convenient method for unpacking and clearing states. 294 * 295 * Example: 296 * ----- 297 * foreach (obj; unpacker.purge()) { 298 * // do stuff 299 * } 300 * ----- 301 * is equivalent to 302 * ----- 303 * foreach (obj; unpacker.unpacked) { 304 * // do stuff 305 * } 306 * unpacker.clear(); 307 * ----- 308 * 309 * Returns: 310 * the $(D Unpacked) object contains deserialized value. 311 */ 312 @safe 313 Unpacked purge() 314 { 315 auto result = Unpacked(context_.stack[0].value); 316 317 clear(); 318 319 return result; 320 } 321 322 323 /** 324 * Executes deserialization. 325 * 326 * Returns: 327 * true if deserialization has been completed, otherwise false. 328 * 329 * Throws: 330 * $(D UnpackException) when parse error occurs. 331 */ 332 bool execute() 333 { 334 /* 335 * Current implementation is very dirty(goto! goto!! goto!!!). 336 * This Complexity for performance(avoid function call). 337 */ 338 339 bool ret; 340 size_t cur = offset_; 341 Value obj; 342 343 // restores before state 344 auto state = context_.state; 345 auto trail = context_.trail; 346 auto top = context_.top; 347 auto stack = &context_.stack; 348 349 /* 350 * Helper for container deserialization 351 */ 352 void startContainer(string Type)(ContainerElement type, size_t length) 353 { 354 mixin("callback" ~ Type ~ "((*stack)[top].value, length);"); 355 356 (*stack)[top].type = type; 357 (*stack)[top].count = length; 358 (*stack).length = ++top + 1; 359 } 360 361 // non-deserialized data is nothing 362 if (used_ - offset_ == 0) 363 goto Labort; 364 365 do { 366 Lstart: 367 if (state == State.HEADER) { 368 const header = buffer_[cur]; 369 370 if (0x00 <= header && header <= 0x7f) { // positive 371 callbackUInt(obj, header); 372 goto Lpush; 373 } else if (0xe0 <= header && header <= 0xff) { // negative 374 callbackInt(obj, cast(byte)header); 375 goto Lpush; 376 } else if (0xa0 <= header && header <= 0xbf) { // fix raw 377 trail = header & 0x1f; 378 state = State.RAW; 379 cur++; 380 continue; 381 } else if (0xd4 <= header && header <= 0xd8) { // fix ext 382 trail = 2 ^^ (header - 0xd4) + 1; 383 state = State.EXT_DATA; 384 cur++; 385 continue; 386 } else if (0x90 <= header && header <= 0x9f) { // fix array 387 size_t length = header & 0x0f; 388 if (length == 0) { 389 callbackArray(obj, 0); 390 goto Lpush; 391 } else { 392 startContainer!"Array"(ContainerElement.ARRAY_ITEM, length); 393 cur++; 394 continue; 395 } 396 } else if (0x80 <= header && header <= 0x8f) { // fix map 397 size_t length = header & 0x0f; 398 if (length == 0) { 399 callbackMap(obj, 0); 400 goto Lpush; 401 } else { 402 startContainer!"Map"(ContainerElement.MAP_KEY, length); 403 cur++; 404 continue; 405 } 406 } else { 407 switch (header) { 408 case Format.UINT8, Format.UINT16, Format.UINT32, Format.UINT64, 409 Format.INT8, Format.INT16, Format.INT32, Format.INT64, 410 Format.FLOAT, Format.DOUBLE: 411 trail = 1 << (header & 0x03); // computes object size 412 state = cast(State)(header & 0x1f); 413 break; 414 case Format.REAL: 415 trail = RealSize; 416 state = State.REAL; 417 break; 418 case Format.ARRAY16, Format.ARRAY32, 419 Format.MAP16, Format.MAP32: 420 trail = 2 << (header & 0x01); // computes container size 421 state = cast(State)(header & 0x1f); 422 break; 423 // raw will become str format in new spec 424 case Format.STR8: 425 case Format.RAW16: // will be STR16 426 case Format.RAW32: // will be STR32 427 trail = 1 << ((header & 0x03) - 1); // computes container size 428 state = cast(State)(header & 0x1f); 429 break; 430 case Format.BIN8, Format.BIN16, Format.BIN32: 431 trail = 1 << (header & 0x03); // computes container size 432 state = cast(State)(header & 0x1f); 433 break; 434 case Format.EXT8: 435 trail = 1; 436 state = State.EXT8; 437 break; 438 case Format.EXT16: 439 trail = 2; 440 state = State.EXT16; 441 break; 442 case Format.EXT32: 443 trail = 4; 444 state = State.EXT32; 445 break; 446 case Format.NIL: 447 callbackNil(obj); 448 goto Lpush; 449 case Format.TRUE: 450 callbackBool(obj, true); 451 goto Lpush; 452 case Format.FALSE: 453 callbackBool(obj, false); 454 goto Lpush; 455 default: 456 throw new UnpackException("Unknown type"); 457 } 458 459 cur++; 460 goto Lstart; 461 } 462 } else { 463 // data lack for deserialization 464 if (used_ - cur < trail) 465 goto Labort; 466 467 const base = cur; cur += trail - 1; // fix current position 468 469 final switch (state) { 470 case State.FLOAT: 471 _f temp; 472 473 temp.i = load32To!uint(buffer_[base..base + trail]); 474 callbackFloat(obj, temp.f); 475 goto Lpush; 476 case State.DOUBLE: 477 _d temp; 478 479 temp.i = load64To!ulong(buffer_[base..base + trail]); 480 callbackFloat(obj, temp.f); 481 goto Lpush; 482 case State.REAL: 483 const expb = base + ulong.sizeof; 484 485 version (NonX86) 486 { 487 CustomFloat!80 temp; 488 489 const frac = load64To!ulong (buffer_[base..expb]); 490 const exp = load16To!ushort(buffer_[expb..expb + ushort.sizeof]); 491 492 temp.significand = frac; 493 temp.exponent = exp & 0x7fff; 494 temp.sign = exp & 0x8000 ? true : false; 495 496 // NOTE: temp.get!real is inf on non-x86 when deserialized value is larger than double.max. 497 callbackFloat(obj, temp.get!real); 498 } 499 else 500 { 501 _r temp; 502 503 temp.fraction = load64To!(typeof(temp.fraction))(buffer_[base..expb]); 504 temp.exponent = load16To!(typeof(temp.exponent))(buffer_[expb..expb + temp.exponent.sizeof]); 505 506 callbackFloat(obj, temp.f); 507 } 508 509 goto Lpush; 510 case State.UINT8: 511 callbackUInt(obj, buffer_[base]); 512 goto Lpush; 513 case State.UINT16: 514 callbackUInt(obj, load16To!ushort(buffer_[base..base + trail])); 515 goto Lpush; 516 case State.UINT32: 517 callbackUInt(obj, load32To!uint(buffer_[base..base + trail])); 518 goto Lpush; 519 case State.UINT64: 520 callbackUInt(obj, load64To!ulong(buffer_[base..base + trail])); 521 goto Lpush; 522 case State.INT8: 523 callbackInt(obj, cast(byte)buffer_[base]); 524 goto Lpush; 525 case State.INT16: 526 callbackInt(obj, load16To!short(buffer_[base..base + trail])); 527 goto Lpush; 528 case State.INT32: 529 callbackInt(obj, load32To!int(buffer_[base..base + trail])); 530 goto Lpush; 531 case State.INT64: 532 callbackInt(obj, load64To!long(buffer_[base..base + trail])); 533 goto Lpush; 534 case State.RAW: Lraw: 535 hasRaw_ = true; 536 callbackRaw(obj, buffer_[base..base + trail]); 537 goto Lpush; 538 539 case State.EXT_DATA: Lext: 540 hasRaw_ = true; 541 obj.via.ext.type = buffer_[base]; 542 callbackExt(obj, buffer_[base+1..base+trail]); 543 goto Lpush; 544 case State.EXT8: 545 trail = buffer_[base] + 1; 546 if (trail == 0) 547 goto Lext; 548 state = State.EXT_DATA; 549 cur++; 550 goto Lstart; 551 case State.EXT16: 552 trail = load16To!size_t(buffer_[base..base+trail]) + 1; 553 if (trail == 0) 554 goto Lext; 555 state = State.EXT_DATA; 556 cur++; 557 goto Lstart; 558 case State.EXT32: 559 trail = load32To!size_t(buffer_[base..base+trail]) + 1; 560 if (trail == 0) 561 goto Lext; 562 state = State.EXT_DATA; 563 cur++; 564 goto Lstart; 565 566 case State.STR8, State.BIN8: 567 trail = buffer_[base]; 568 if (trail == 0) 569 goto Lraw; 570 state = State.RAW; 571 cur++; 572 goto Lstart; 573 case State.RAW16, State.BIN16: 574 trail = load16To!size_t(buffer_[base..base + trail]); 575 if (trail == 0) 576 goto Lraw; 577 state = State.RAW; 578 cur++; 579 goto Lstart; 580 case State.RAW32, State.BIN32: 581 trail = load32To!size_t(buffer_[base..base + trail]); 582 if (trail == 0) 583 goto Lraw; 584 state = State.RAW; 585 cur++; 586 goto Lstart; 587 case State.ARRAY16: 588 size_t length = load16To!size_t(buffer_[base..base + trail]); 589 if (length == 0) { 590 callbackArray(obj, 0); 591 goto Lpush; 592 } else { 593 startContainer!"Array"(ContainerElement.ARRAY_ITEM, length); 594 state = State.HEADER; 595 cur++; 596 continue; 597 } 598 case State.ARRAY36: 599 size_t length = load32To!size_t(buffer_[base..base + trail]); 600 if (length == 0) { 601 callbackArray(obj, 0); 602 goto Lpush; 603 } else { 604 startContainer!"Array"(ContainerElement.ARRAY_ITEM, length); 605 state = State.HEADER; 606 cur++; 607 continue; 608 } 609 case State.MAP16: 610 size_t length = load16To!size_t(buffer_[base..base + trail]); 611 if (length == 0) { 612 callbackMap(obj, 0); 613 goto Lpush; 614 } else { 615 startContainer!"Map"(ContainerElement.MAP_KEY, length); 616 state = State.HEADER; 617 cur++; 618 continue; 619 } 620 case State.MAP32: 621 size_t length = load32To!size_t(buffer_[base..base + trail]); 622 if (length == 0) { 623 callbackMap(obj, 0); 624 goto Lpush; 625 } else { 626 startContainer!"Map"(ContainerElement.MAP_KEY, length); 627 state = State.HEADER; 628 cur++; 629 continue; 630 } 631 case State.HEADER: 632 break; 633 } 634 } 635 636 Lpush: 637 if (top == 0) 638 goto Lfinish; 639 640 auto container = &(*stack)[top - 1]; 641 642 final switch (container.type) { 643 case ContainerElement.ARRAY_ITEM: 644 container.value.via.array ~= obj; 645 if (--container.count == 0) { 646 obj = container.value; 647 top--; 648 goto Lpush; 649 } 650 break; 651 case ContainerElement.MAP_KEY: 652 container.key = obj; 653 container.type = ContainerElement.MAP_VALUE; 654 break; 655 case ContainerElement.MAP_VALUE: 656 container.value.via.map[container.key] = obj; 657 if (--container.count == 0) { 658 obj = container.value; 659 top--; 660 goto Lpush; 661 } 662 container.type = ContainerElement.MAP_KEY; 663 } 664 665 state = State.HEADER; 666 cur++; 667 } while (cur < used_); 668 669 goto Labort; 670 671 Lfinish: 672 (*stack)[0].value = obj; 673 ret = true; 674 cur++; 675 goto Lend; 676 677 Labort: 678 ret = false; 679 680 Lend: 681 context_.state = state; 682 context_.trail = trail; 683 context_.top = top; 684 parsed_ += cur - offset_; 685 offset_ = cur; 686 687 return ret; 688 } 689 690 691 /** 692 * supports foreach. One loop provides $(D Unpacked) object contains execute() result. 693 * This is convenient in case that $(D MessagePack) values are continuous. 694 */ 695 int opApply(scope int delegate(ref Unpacked) dg) 696 { 697 int result; 698 699 while (execute()) { 700 auto unpackedResult = Unpacked(context_.stack[0].value); 701 result = dg(unpackedResult); 702 if (result) 703 break; 704 705 clear(); 706 } 707 708 return result; 709 } 710 711 712 private: 713 /* 714 * initializes internal stack environment. 715 */ 716 @safe 717 nothrow void initializeContext() 718 { 719 context_.state = State.HEADER; 720 context_.trail = 0; 721 context_.top = 0; 722 context_.stack.length = 1; 723 } 724 } 725 726 727 unittest 728 { 729 import msgpack.packer; 730 731 { 732 // serialize 733 mixin DefinePacker; 734 735 packer.packArray(null, true, 1, -2, "Hi!", [1], [1:1], double.max, ExtValue(7, [1,2,3,4])); 736 737 // deserialize 738 auto unpacker = StreamingUnpacker(packer.stream.data); unpacker.execute(); 739 auto unpacked = unpacker.purge(); 740 741 // Range test 742 foreach (unused; 0..2) { 743 uint i; 744 745 foreach (obj; unpacked) 746 i++; 747 748 assert(i == unpacked.via.array.length); 749 } 750 751 auto result = unpacked.via.array; 752 753 assert(result[0].type == Value.Type.nil); 754 assert(result[1].via.boolean == true); 755 assert(result[2].via.uinteger == 1); 756 assert(result[3].via.integer == -2); 757 assert(result[4].via.raw == [72, 105, 33]); 758 assert(result[5].as!(int[]) == [1]); 759 assert(result[6].as!(int[int]) == [1:1]); 760 assert(result[7].as!(double) == double.max); 761 assert(result[8].as!(ExtValue) == ExtValue(7, [1,2,3,4])); 762 } 763 764 // Test many combinations of EXT 765 { 766 mixin DefinePacker; 767 768 alias Lengths = TypeTuple!(0, 1, 2, 3, 4, 5, 8, 15, 16, 31, 769 255, 256, 2^^16, 2^^32); 770 771 // Initialize a bunch of ExtValues and pack them 772 ExtValue[Lengths.length] values; 773 foreach (I, L; Lengths) 774 values[I] = ExtValue(7, new ubyte[](L)); 775 packer.pack(values); 776 777 auto unpacker = StreamingUnpacker(packer.stream.data); unpacker.execute(); 778 auto unpacked = unpacker.purge(); 779 780 // Compare unpacked values to originals 781 size_t i = 0; 782 foreach (deserialized; unpacked) 783 assert(deserialized == values[i++]); 784 } 785 } 786 787 788 private: 789 @trusted: 790 791 792 /** 793 * Sets value type and value. 794 * 795 * Params: 796 * value = the value to set 797 * number = the content to set 798 */ 799 void callbackUInt(ref Value value, ulong number) 800 { 801 value.type = Value.Type.unsigned; 802 value.via.uinteger = number; 803 } 804 805 806 /// ditto 807 void callbackInt(ref Value value, long number) 808 { 809 value.type = Value.Type.signed; 810 value.via.integer = number; 811 } 812 813 814 /// ditto 815 void callbackFloat(ref Value value, real number) 816 { 817 value.type = Value.Type.floating; 818 value.via.floating = number; 819 } 820 821 822 /// ditto 823 void callbackRaw(ref Value value, ubyte[] raw) 824 { 825 value.type = Value.Type.raw; 826 value.via.raw = raw; 827 } 828 829 /// ditto 830 void callbackExt(ref Value value, ubyte[] raw) 831 { 832 value.type = Value.Type.ext; 833 value.via.ext.data = raw; 834 } 835 836 /// ditto 837 void callbackArray(ref Value value, size_t length) 838 { 839 value.type = Value.Type.array; 840 value.via.array.length = 0; 841 value.via.array.reserve(length); 842 } 843 844 845 /// ditto 846 void callbackMap(ref Value value, lazy size_t length) 847 { 848 value.type = Value.Type.map; 849 value.via.map = null; // clears previous result avoiding 'Access Violation' 850 } 851 852 853 /// ditto 854 void callbackNil(ref Value value) 855 { 856 value.type = Value.Type.nil; 857 } 858 859 860 /// ditto 861 void callbackBool(ref Value value, bool boolean) 862 { 863 value.type = Value.Type.boolean; 864 value.via.boolean = boolean; 865 } 866 867 868 unittest 869 { 870 Value value; 871 872 // Unsigned integer 873 callbackUInt(value, uint.max); 874 assert(value.type == Value.Type.unsigned); 875 assert(value.via.uinteger == uint.max); 876 877 // Signed integer 878 callbackInt(value, int.min); 879 assert(value.type == Value.Type.signed); 880 assert(value.via.integer == int.min); 881 882 // Floating point 883 callbackFloat(value, real.max); 884 assert(value.type == Value.Type.floating); 885 assert(value.via.floating == real.max); 886 887 // Raw 888 callbackRaw(value, cast(ubyte[])[1]); 889 assert(value.type == Value.Type.raw); 890 assert(value.via.raw == cast(ubyte[])[1]); 891 892 // Array 893 Value[] array; array.reserve(16); 894 895 callbackArray(value, 16); 896 assert(value.type == Value.Type.array); 897 assert(value.via.array.capacity == array.capacity); 898 899 // Map 900 Value[Value] map; 901 902 callbackMap(value, 16); 903 assert(value.type == Value.Type.map); 904 assert(value.via.map == null); 905 906 // NIL 907 callbackNil(value); 908 assert(value.type == Value.Type.nil); 909 910 // Bool 911 callbackBool(value, true); 912 assert(value.type == Value.Type.boolean); 913 assert(value.via.boolean == true); 914 }